incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [9/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/src/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
new file mode 100644
index 0000000..bc898b4
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -0,0 +1,972 @@
+package org.apache.blur.manager;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.PRIME_DOC;
+import static org.apache.blur.utils.BlurConstants.RECORD_ID;
+import static org.apache.blur.utils.BlurConstants.ROW_ID;
+import static org.apache.blur.utils.BlurUtil.findRecordMutation;
+import static org.apache.blur.utils.BlurUtil.readFilter;
+import static org.apache.blur.utils.BlurUtil.readQuery;
+import static org.apache.blur.utils.RowDocumentUtil.getColumns;
+import static org.apache.blur.utils.RowDocumentUtil.getRow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.EscapeRewrite;
+import org.apache.blur.lucene.search.FacetQuery;
+import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.manager.results.BlurResultIterableSearcher;
+import org.apache.blur.manager.results.MergerBlurResultIterable;
+import org.apache.blur.manager.status.QueryStatus;
+import org.apache.blur.manager.status.QueryStatusManager;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.metrics.BlurMetrics;
+import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.MutationHelper;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.ExpertQuery;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.FetchRowResult;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.RecordMutationType;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.RowMutationType;
+import org.apache.blur.thrift.generated.Schema;
+import org.apache.blur.thrift.generated.ScoreType;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.SimpleQuery;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurExecutorCompletionService;
+import org.apache.blur.utils.ForkJoin;
+import org.apache.blur.utils.TermDocIterable;
+import org.apache.blur.utils.BlurExecutorCompletionService.Cancel;
+import org.apache.blur.utils.ForkJoin.Merger;
+import org.apache.blur.utils.ForkJoin.ParallelCall;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.FieldSelector;
+import org.apache.lucene.document.FieldSelectorResult;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TermDocs;
+import org.apache.lucene.index.TermEnum;
+import org.apache.lucene.queryParser.ParseException;
+import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.Filter;
+import org.apache.lucene.search.FilteredQuery;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.util.ReaderUtil;
+
+
+public class IndexManager {
+
+  private static final String NOT_FOUND = "NOT_FOUND";
+  private static final Log LOG = LogFactory.getLog(IndexManager.class);
+
+  private IndexServer _indexServer;
+  private ExecutorService _executor;
+  private ExecutorService _mutateExecutor;
+  private int _threadCount;
+  private QueryStatusManager _statusManager = new QueryStatusManager();
+  private boolean _closed;
+  private BlurPartitioner<BytesWritable, Void> _blurPartitioner = new BlurPartitioner<BytesWritable, Void>();
+  private BlurFilterCache _filterCache = new DefaultBlurFilterCache();
+  private BlurMetrics _blurMetrics;
+  private long _defaultParallelCallTimeout = TimeUnit.MINUTES.toMillis(1);
+
+  public void setMaxClauseCount(int maxClauseCount) {
+    BooleanQuery.setMaxClauseCount(maxClauseCount);
+  }
+
+  public void init() {
+    _executor = Executors.newThreadPool("index-manager", _threadCount);
+    // @TODO give the mutate it's own thread pool
+    _mutateExecutor = Executors.newThreadPool("index-manager-mutate", _threadCount);
+    _statusManager.init();
+    LOG.info("Init Complete");
+  }
+
+  public synchronized void close() {
+    if (!_closed) {
+      _closed = true;
+      _statusManager.close();
+      _executor.shutdownNow();
+      _mutateExecutor.shutdownNow();
+      _indexServer.close();
+    }
+  }
+
+  public void fetchRow(String table, Selector selector, FetchResult fetchResult) throws BlurException {
+    validSelector(selector);
+    BlurIndex index;
+    try {
+      if (selector.getLocationId() == null) {
+        populateSelector(table, selector);
+      }
+      String locationId = selector.getLocationId();
+      if (locationId.equals(NOT_FOUND)) {
+        fetchResult.setDeleted(false);
+        fetchResult.setExists(false);
+        return;
+      }
+      String shard = getShard(locationId);
+      Map<String, BlurIndex> blurIndexes = _indexServer.getIndexes(table);
+      if (blurIndexes == null) {
+        LOG.error("Table [{0}] not found", table);
+        throw new BlurException("Table [" + table + "] not found", null);
+      }
+      index = blurIndexes.get(shard);
+      if (index == null) {
+        if (index == null) {
+          LOG.error("Shard [{0}] not found in table [{1}]", shard, table);
+          throw new BlurException("Shard [" + shard + "] not found in table [" + table + "]", null);
+        }
+      }
+    } catch (BlurException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get the correct index reader for selector [{0}].", e, selector);
+      throw new BException(e.getMessage(), e);
+    }
+    IndexReader reader = null;
+    try {
+      reader = index.getIndexReader();
+      fetchRow(reader, table, selector, fetchResult);
+      if (_blurMetrics != null) {
+        if (fetchResult.rowResult != null) {
+          if (fetchResult.rowResult.row != null && fetchResult.rowResult.row.records != null) {
+            _blurMetrics.recordReads.addAndGet(fetchResult.rowResult.row.records.size());
+          }
+          _blurMetrics.rowReads.incrementAndGet();
+        } else if (fetchResult.recordResult != null) {
+          _blurMetrics.recordReads.incrementAndGet();
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to fetch row.", e);
+      throw new BException(e.getMessage(), e);
+    } finally {
+      if (reader != null) {
+        // this will allow for closing of index
+        try {
+          reader.decRef();
+        } catch (IOException e) {
+          LOG.error("Unknown error trying to call decRef on reader [{0}]", e, reader);
+        }
+      }
+    }
+  }
+
+  private void populateSelector(String table, Selector selector) throws IOException, BlurException {
+    String rowId = selector.rowId;
+    String recordId = selector.recordId;
+    String shardName = MutationHelper.getShardName(table, rowId, getNumberOfShards(table), _blurPartitioner);
+    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
+    BlurIndex blurIndex = indexes.get(shardName);
+    if (blurIndex == null) {
+      throw new BlurException("Shard [" + shardName + "] is not being servered by this shardserver.", null);
+    }
+    IndexReader reader = blurIndex.getIndexReader();
+    try {
+      IndexSearcher searcher = new IndexSearcher(reader);
+      BooleanQuery query = new BooleanQuery();
+      if (selector.recordOnly) {
+        query.add(new TermQuery(new Term(RECORD_ID, recordId)), Occur.MUST);
+        query.add(new TermQuery(new Term(ROW_ID, rowId)), Occur.MUST);
+      } else {
+        query.add(new TermQuery(new Term(ROW_ID, rowId)), Occur.MUST);
+        query.add(new TermQuery(BlurConstants.PRIME_DOC_TERM), Occur.MUST);
+      }
+      TopDocs topDocs = searcher.search(query, 1);
+      if (topDocs.totalHits > 1) {
+        if (selector.recordOnly) {
+          LOG.warn("Rowid [" + rowId + "], recordId [" + recordId + "] has more than one prime doc that is not deleted.");
+        } else {
+          LOG.warn("Rowid [" + rowId + "] has more than one prime doc that is not deleted.");
+        }
+      }
+      if (topDocs.totalHits == 1) {
+        selector.setLocationId(shardName + "/" + topDocs.scoreDocs[0].doc);
+      } else {
+        selector.setLocationId(NOT_FOUND);
+      }
+    } finally {
+      // this will allow for closing of index
+      reader.decRef();
+    }
+  }
+
+  public static void validSelector(Selector selector) throws BlurException {
+    String locationId = selector.locationId;
+    String rowId = selector.rowId;
+    String recordId = selector.recordId;
+    boolean recordOnly = selector.recordOnly;
+
+    if (locationId != null) {
+      if (recordId != null && rowId != null) {
+        throw new BlurException("Invalid selector locationId [" + locationId + "] and recordId [" + recordId + "] and rowId [" + rowId
+            + "] are set, if using locationId, then rowId and recordId are not needed.", null);
+      } else if (recordId != null) {
+        throw new BlurException("Invalid selector locationId [" + locationId + "] and recordId [" + recordId + "] sre set, if using locationId recordId is not needed.", null);
+      } else if (rowId != null) {
+        throw new BlurException("Invalid selector locationId [" + locationId + "] and rowId [" + rowId + "] are set, if using locationId rowId is not needed.", null);
+      }
+    } else {
+      if (rowId != null && recordId != null) {
+        if (!recordOnly) {
+          throw new BlurException("Invalid both rowid [" + rowId + "] and recordId [" + recordId + "] are set, and recordOnly is set to [false].  "
+              + "If you want entire row, then remove recordId, if you want record only set recordOnly to [true].", null);
+        }
+      } else if (recordId != null) {
+        throw new BlurException("Invalid recordId [" + recordId + "] is set but rowId is not set.  If rowId is not known then a query will be required.", null);
+      }
+    }
+  }
+
+  /**
+   * Location id format is <shard>/luceneid.
+   * 
+   * @param locationId
+   * @return
+   */
+  private String getShard(String locationId) {
+    String[] split = locationId.split("\\/");
+    if (split.length != 2) {
+      throw new IllegalArgumentException("Location id invalid [" + locationId + "]");
+    }
+    return split[0];
+  }
+
+  public BlurResultIterable query(final String table, final BlurQuery blurQuery, AtomicLongArray facetedCounts) throws Exception {
+    final AtomicBoolean running = new AtomicBoolean(true);
+    final QueryStatus status = _statusManager.newQueryStatus(table, blurQuery, _threadCount, running);
+    _blurMetrics.queriesExternal.incrementAndGet();
+    try {
+      Map<String, BlurIndex> blurIndexes;
+      try {
+        blurIndexes = _indexServer.getIndexes(table);
+      } catch (IOException e) {
+        LOG.error("Unknown error while trying to fetch index readers.", e);
+        throw new BException(e.getMessage(), e);
+      }
+      Analyzer analyzer = _indexServer.getAnalyzer(table);
+      ParallelCall<Entry<String, BlurIndex>, BlurResultIterable> call;
+      if (isSimpleQuery(blurQuery)) {
+        SimpleQuery simpleQuery = blurQuery.simpleQuery;
+        Filter preFilter = QueryParserUtil.parseFilter(table, simpleQuery.preSuperFilter, false, analyzer, _filterCache);
+        Filter postFilter = QueryParserUtil.parseFilter(table, simpleQuery.postSuperFilter, true, analyzer, _filterCache);
+        Query userQuery = QueryParserUtil.parseQuery(simpleQuery.queryStr, simpleQuery.superQueryOn, analyzer, postFilter, preFilter, getScoreType(simpleQuery.type));
+        Query facetedQuery = getFacetedQuery(blurQuery, userQuery, facetedCounts, analyzer);
+        call = new SimpleQueryParallelCall(running, table, status, _indexServer, facetedQuery, blurQuery.selector, _blurMetrics);
+      } else {
+        Query query = getQuery(blurQuery.expertQuery);
+        Filter filter = getFilter(blurQuery.expertQuery);
+        Query userQuery;
+        if (filter != null) {
+          userQuery = new FilteredQuery(query, filter);
+        } else {
+          userQuery = query;
+        }
+        Query facetedQuery = getFacetedQuery(blurQuery, userQuery, facetedCounts, analyzer);
+        call = new SimpleQueryParallelCall(running, table, status, _indexServer, facetedQuery, blurQuery.selector, _blurMetrics);
+      }
+      MergerBlurResultIterable merger = new MergerBlurResultIterable(blurQuery);
+      return ForkJoin.execute(_executor, blurIndexes.entrySet(), call, new Cancel() {
+        @Override
+        public void cancel() {
+          running.set(false);
+        }
+      }).merge(merger);
+    } finally {
+      _statusManager.removeStatus(status);
+    }
+  }
+
+  private Filter getFilter(ExpertQuery expertQuery) throws BException {
+    return readFilter(expertQuery.getFilter());
+  }
+
+  private Query getQuery(ExpertQuery expertQuery) throws BException {
+    return readQuery(expertQuery.getQuery());
+  }
+
+  private boolean isSimpleQuery(BlurQuery blurQuery) {
+    if (blurQuery.simpleQuery != null) {
+      return true;
+    }
+    return false;
+  }
+
+  private Query getFacetedQuery(BlurQuery blurQuery, Query userQuery, AtomicLongArray counts, Analyzer analyzer) throws ParseException {
+    if (blurQuery.facets == null) {
+      return userQuery;
+    }
+    return new FacetQuery(userQuery, getFacetQueries(blurQuery, analyzer), counts);
+  }
+
+  private Query[] getFacetQueries(BlurQuery blurQuery, Analyzer analyzer) throws ParseException {
+    int size = blurQuery.facets.size();
+    Query[] queries = new Query[size];
+    for (int i = 0; i < size; i++) {
+      queries[i] = QueryParserUtil.parseQuery(blurQuery.facets.get(i).queryStr, blurQuery.simpleQuery.superQueryOn, analyzer, null, null, ScoreType.CONSTANT);
+    }
+    return queries;
+  }
+
+  private ScoreType getScoreType(ScoreType type) {
+    if (type == null) {
+      return ScoreType.SUPER;
+    }
+    return type;
+  }
+
+  public void cancelQuery(String table, long uuid) {
+    _statusManager.cancelQuery(table, uuid);
+  }
+
+  public List<BlurQueryStatus> currentQueries(String table) {
+    return _statusManager.currentQueries(table);
+  }
+
+  public BlurQueryStatus queryStatus(String table, long uuid) throws BlurException {
+    return _statusManager.queryStatus(table, uuid);
+  }
+
+  public List<Long> queryStatusIdList(String table) {
+    return _statusManager.queryStatusIdList(table);
+  }
+
+  public static void fetchRow(IndexReader reader, String table, Selector selector, FetchResult fetchResult) throws CorruptIndexException, IOException {
+    fetchResult.table = table;
+    String locationId = selector.locationId;
+    int lastSlash = locationId.lastIndexOf('/');
+    int docId = Integer.parseInt(locationId.substring(lastSlash + 1));
+    if (docId >= reader.maxDoc()) {
+      throw new RuntimeException("Location id [" + locationId + "] with docId [" + docId + "] is not valid.");
+    }
+    if (selector.isRecordOnly()) {
+      // select only the row for the given data or location id.
+      if (reader.isDeleted(docId)) {
+        fetchResult.exists = false;
+        fetchResult.deleted = true;
+        return;
+      } else {
+        fetchResult.exists = true;
+        fetchResult.deleted = false;
+        Document document = reader.document(docId, getFieldSelector(selector));
+        fetchResult.recordResult = getColumns(document);
+        return;
+      }
+    } else {
+      if (reader.isDeleted(docId)) {
+        fetchResult.exists = false;
+        fetchResult.deleted = true;
+        return;
+      } else {
+        fetchResult.exists = true;
+        fetchResult.deleted = false;
+        String rowId = getRowId(reader, docId);
+        TermDocs termDocs = reader.termDocs(new Term(ROW_ID, rowId));
+        fetchResult.rowResult = new FetchRowResult(getRow(new TermDocIterable(termDocs, reader, getFieldSelector(selector))));
+        return;
+      }
+    }
+  }
+
+  private static String getRowId(IndexReader reader, int docId) throws CorruptIndexException, IOException {
+    Document document = reader.document(docId, new FieldSelector() {
+      private static final long serialVersionUID = 4912420100148752051L;
+
+      @Override
+      public FieldSelectorResult accept(String fieldName) {
+        if (ROW_ID.equals(fieldName)) {
+          return FieldSelectorResult.LOAD_AND_BREAK;
+        }
+        return FieldSelectorResult.NO_LOAD;
+      }
+    });
+    return document.get(ROW_ID);
+  }
+
+  private static String getColumnName(String fieldName) {
+    return fieldName.substring(fieldName.lastIndexOf('.') + 1);
+  }
+
+  private static String getColumnFamily(String fieldName) {
+    return fieldName.substring(0, fieldName.lastIndexOf('.'));
+  }
+
+  public static FieldSelector getFieldSelector(final Selector selector) {
+    return new FieldSelector() {
+      private static final long serialVersionUID = 4089164344758433000L;
+
+      @Override
+      public FieldSelectorResult accept(String fieldName) {
+        if (ROW_ID.equals(fieldName)) {
+          return FieldSelectorResult.LOAD;
+        }
+        if (RECORD_ID.equals(fieldName)) {
+          return FieldSelectorResult.LOAD;
+        }
+        if (PRIME_DOC.equals(fieldName)) {
+          return FieldSelectorResult.NO_LOAD;
+        }
+        if (selector.columnFamiliesToFetch == null && selector.columnsToFetch == null) {
+          return FieldSelectorResult.LOAD;
+        }
+        String columnFamily = getColumnFamily(fieldName);
+        if (selector.columnFamiliesToFetch != null) {
+          if (selector.columnFamiliesToFetch.contains(columnFamily)) {
+            return FieldSelectorResult.LOAD;
+          }
+        }
+        String columnName = getColumnName(fieldName);
+        if (selector.columnsToFetch != null) {
+          Set<String> columns = selector.columnsToFetch.get(columnFamily);
+          if (columns != null && columns.contains(columnName)) {
+            return FieldSelectorResult.LOAD;
+          }
+        }
+        return FieldSelectorResult.NO_LOAD;
+      }
+    };
+  }
+
+  public IndexServer getIndexServer() {
+    return _indexServer;
+  }
+
+  public void setIndexServer(IndexServer indexServer) {
+    this._indexServer = indexServer;
+  }
+
+  public long recordFrequency(final String table, final String columnFamily, final String columnName, final String value) throws Exception {
+    Map<String, BlurIndex> blurIndexes;
+    try {
+      blurIndexes = _indexServer.getIndexes(table);
+    } catch (IOException e) {
+      LOG.error("Unknown error while trying to fetch index readers.", e);
+      throw new BException(e.getMessage(), e);
+    }
+    return ForkJoin.execute(_executor, blurIndexes.entrySet(), new ParallelCall<Entry<String, BlurIndex>, Long>() {
+      @Override
+      public Long call(Entry<String, BlurIndex> input) throws Exception {
+        BlurIndex index = input.getValue();
+        IndexReader reader = index.getIndexReader();
+        try {
+          return recordFrequency(reader, columnFamily, columnName, value);
+        } finally {
+          // this will allow for closing of index
+          reader.decRef();
+        }
+      }
+    }).merge(new Merger<Long>() {
+      @Override
+      public Long merge(BlurExecutorCompletionService<Long> service) throws BlurException {
+        long total = 0;
+        while (service.getRemainingCount() > 0) {
+          Future<Long> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table, columnFamily, columnName, value);
+          total += service.getResultThrowException(future, table, columnFamily, columnName, value);
+        }
+        return total;
+      }
+    });
+  }
+
+  public List<String> terms(final String table, final String columnFamily, final String columnName, final String startWith, final short size) throws Exception {
+    Map<String, BlurIndex> blurIndexes;
+    try {
+      blurIndexes = _indexServer.getIndexes(table);
+    } catch (IOException e) {
+      LOG.error("Unknown error while trying to fetch index readers.", e);
+      throw new BException(e.getMessage(), e);
+    }
+    return ForkJoin.execute(_executor, blurIndexes.entrySet(), new ParallelCall<Entry<String, BlurIndex>, List<String>>() {
+      @Override
+      public List<String> call(Entry<String, BlurIndex> input) throws Exception {
+        BlurIndex index = input.getValue();
+        IndexReader reader = index.getIndexReader();
+        try {
+          return terms(reader, columnFamily, columnName, startWith, size);
+        } finally {
+          // this will allow for closing of index
+          reader.decRef();
+        }
+      }
+    }).merge(new Merger<List<String>>() {
+      @Override
+      public List<String> merge(BlurExecutorCompletionService<List<String>> service) throws BlurException {
+        TreeSet<String> terms = new TreeSet<String>();
+        while (service.getRemainingCount() > 0) {
+          Future<List<String>> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table, columnFamily, columnName, startWith, size);
+          terms.addAll(service.getResultThrowException(future, table, columnFamily, columnName, startWith, size));
+        }
+        return new ArrayList<String>(terms).subList(0, Math.min(size, terms.size()));
+      }
+    });
+  }
+
+  public static long recordFrequency(IndexReader reader, String columnFamily, String columnName, String value) throws IOException {
+    return reader.docFreq(getTerm(columnFamily, columnName, value));
+  }
+
+  public static List<String> terms(IndexReader reader, String columnFamily, String columnName, String startWith, short size) throws IOException {
+    Term term = getTerm(columnFamily, columnName, startWith);
+    String field = term.field();
+    List<String> terms = new ArrayList<String>(size);
+    TermEnum termEnum = reader.terms(term);
+    try {
+      do {
+        Term currentTerm = termEnum.term();
+        if (currentTerm == null) {
+          return terms;
+        }
+        if (!currentTerm.field().equals(field)) {
+          break;
+        }
+        terms.add(currentTerm.text());
+        if (terms.size() >= size) {
+          return terms;
+        }
+      } while (termEnum.next());
+      return terms;
+    } finally {
+      termEnum.close();
+    }
+  }
+
+  private static Term getTerm(String columnFamily, String columnName, String value) {
+    if (columnName == null) {
+      throw new NullPointerException("ColumnName cannot both be null.");
+    }
+    if (columnFamily == null) {
+      return new Term(columnName, value);
+    }
+    return new Term(columnFamily + "." + columnName, value);
+  }
+
+  public Schema schema(String table) throws IOException {
+    Schema schema = new Schema().setTable(table);
+    schema.columnFamilies = new TreeMap<String, Set<String>>();
+    Map<String, BlurIndex> blurIndexes = _indexServer.getIndexes(table);
+    for (BlurIndex blurIndex : blurIndexes.values()) {
+      IndexReader reader = blurIndex.getIndexReader();
+      try {
+        FieldInfos mergedFieldInfos = ReaderUtil.getMergedFieldInfos(reader);
+        for (FieldInfo fieldInfo : mergedFieldInfos) {
+          String fieldName = fieldInfo.name;
+          int index = fieldName.indexOf('.');
+          if (index > 0) {
+            String columnFamily = fieldName.substring(0, index);
+            String column = fieldName.substring(index + 1);
+            Set<String> set = schema.columnFamilies.get(columnFamily);
+            if (set == null) {
+              set = new TreeSet<String>();
+              schema.columnFamilies.put(columnFamily, set);
+            }
+            set.add(column);
+          }
+        }
+      } finally {
+        // this will allow for closing of index
+        reader.decRef();
+      }
+    }
+    return schema;
+  }
+
+  public void setStatusCleanupTimerDelay(long delay) {
+    _statusManager.setStatusCleanupTimerDelay(delay);
+  }
+
+  public void mutate(final RowMutation mutation) throws BlurException, IOException {
+    Future<Void> future = _mutateExecutor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        doMutate(mutation);
+        return null;
+      }
+    });
+    try {
+      future.get();
+    } catch (InterruptedException e) {
+      throw new BException("Unknown error during mutation", e);
+    } catch (ExecutionException e) {
+      throw new BException("Unknown error during mutation", e.getCause());
+    }
+  }
+
+  public void mutate(final List<RowMutation> mutations) throws BlurException, IOException {
+    Future<Void> future = _mutateExecutor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        long s = System.nanoTime();
+        doMutates(mutations);
+        long e = System.nanoTime();
+        LOG.debug("doMutates took [" + (e - s) / 1000000.0 + " ms] to complete");
+        return null;
+      }
+    });
+    try {
+      future.get();
+    } catch (InterruptedException e) {
+      throw new BException("Unknown error during mutation", e);
+    } catch (ExecutionException e) {
+      throw new BException("Unknown error during mutation", e.getCause());
+    }
+  }
+
+  private void doMutates(List<RowMutation> mutations) throws BlurException, IOException {
+    Map<String, List<RowMutation>> map = getMutatesPerTable(mutations);
+    for (Entry<String, List<RowMutation>> entry : map.entrySet()) {
+      doMutates(entry.getKey(), entry.getValue());
+    }
+  }
+
+  private void doMutates(final String table, List<RowMutation> mutations) throws IOException, BlurException {
+    final Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
+
+    Map<String, List<RowMutation>> mutationsByShard = new HashMap<String, List<RowMutation>>();
+
+    for (int i = 0; i < mutations.size(); i++) {
+      RowMutation mutation = mutations.get(i);
+      String shard = MutationHelper.getShardName(table, mutation.rowId, getNumberOfShards(table), _blurPartitioner);
+      List<RowMutation> list = mutationsByShard.get(shard);
+      if (list == null) {
+        list = new ArrayList<RowMutation>();
+        mutationsByShard.put(shard, list);
+      }
+      list.add(mutation);
+    }
+
+    List<Future<Void>> futures = new ArrayList<Future<Void>>();
+
+    for (Entry<String, List<RowMutation>> entry : mutationsByShard.entrySet()) {
+      final String shard = entry.getKey();
+      final List<RowMutation> value = entry.getValue();
+      futures.add(_mutateExecutor.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          executeMutates(table, shard, indexes, value);
+          return null;
+        }
+      }));
+    }
+
+    for (Future<Void> future : futures) {
+      try {
+        future.get();
+      } catch (InterruptedException e) {
+        throw new BException("Unknown error during mutation", e);
+      } catch (ExecutionException e) {
+        throw new BException("Unknown error during mutation", e.getCause());
+      }
+    }
+  }
+
+  private void executeMutates(String table, String shard, Map<String, BlurIndex> indexes, List<RowMutation> mutations) throws BlurException, IOException {
+    long s = System.nanoTime();
+    boolean waitToBeVisible = false;
+    for (int i = 0; i < mutations.size(); i++) {
+      RowMutation mutation = mutations.get(i);
+      if (mutation.waitToBeVisible) {
+        waitToBeVisible = true;
+      }
+      BlurIndex blurIndex = indexes.get(shard);
+      if (blurIndex == null) {
+        throw new BlurException("Shard [" + shard + "] in table [" + table + "] is not being served by this server.", null);
+      }
+
+      boolean waitVisiblity = false;
+      if (i + 1 == mutations.size()) {
+        waitVisiblity = waitToBeVisible;
+      }
+      RowMutationType type = mutation.rowMutationType;
+      switch (type) {
+      case REPLACE_ROW:
+        Row row = MutationHelper.getRowFromMutations(mutation.rowId, mutation.recordMutations);
+        blurIndex.replaceRow(waitVisiblity, mutation.wal, row);
+        break;
+      case UPDATE_ROW:
+        doUpdateRowMutation(mutation, blurIndex);
+        break;
+      case DELETE_ROW:
+        blurIndex.deleteRow(waitVisiblity, mutation.wal, mutation.rowId);
+        break;
+      default:
+        throw new RuntimeException("Not supported [" + type + "]");
+      }
+    }
+    long e = System.nanoTime();
+    LOG.debug("executeMutates took [" + (e - s) / 1000000.0 + " ms] to complete");
+  }
+
+  private Map<String, List<RowMutation>> getMutatesPerTable(List<RowMutation> mutations) {
+    Map<String, List<RowMutation>> map = new HashMap<String, List<RowMutation>>();
+    for (RowMutation mutation : mutations) {
+      String table = mutation.table;
+      List<RowMutation> list = map.get(table);
+      if (list == null) {
+        list = new ArrayList<RowMutation>();
+        map.put(table, list);
+      }
+      list.add(mutation);
+    }
+    return map;
+  }
+
+  private void doMutate(RowMutation mutation) throws BlurException, IOException {
+    String table = mutation.table;
+    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
+    MutationHelper.validateMutation(mutation);
+    String shard = MutationHelper.getShardName(table, mutation.rowId, getNumberOfShards(table), _blurPartitioner);
+    BlurIndex blurIndex = indexes.get(shard);
+    if (blurIndex == null) {
+      throw new BlurException("Shard [" + shard + "] in table [" + table + "] is not being served by this server.", null);
+    }
+
+    RowMutationType type = mutation.rowMutationType;
+    switch (type) {
+    case REPLACE_ROW:
+      Row row = MutationHelper.getRowFromMutations(mutation.rowId, mutation.recordMutations);
+      blurIndex.replaceRow(mutation.waitToBeVisible, mutation.wal, row);
+      break;
+    case UPDATE_ROW:
+      doUpdateRowMutation(mutation, blurIndex);
+      break;
+    case DELETE_ROW:
+      blurIndex.deleteRow(mutation.waitToBeVisible, mutation.wal, mutation.rowId);
+      break;
+    default:
+      throw new RuntimeException("Not supported [" + type + "]");
+    }
+  }
+
+  private void doUpdateRowMutation(RowMutation mutation, BlurIndex blurIndex) throws BlurException, IOException {
+    FetchResult fetchResult = new FetchResult();
+    Selector selector = new Selector();
+    selector.setAllowStaleDataIsSet(false);
+    selector.setRowId(mutation.rowId);
+    fetchRow(mutation.table, selector, fetchResult);
+    if (fetchResult.exists) {
+      // We will examine the contents of the existing row and add records
+      // onto a new replacement row based on the mutation we have been given.
+      Row existingRow = fetchResult.rowResult.row;
+      Row newRow = new Row().setId(existingRow.id);
+
+      // Create a local copy of the mutation we can modify
+      RowMutation mutationCopy = mutation.deepCopy();
+
+      // Match existing records against record mutations. Once a record
+      // mutation has been processed, remove it from our local copy.
+      for (Record existingRecord : existingRow.records) {
+        RecordMutation recordMutation = findRecordMutation(mutationCopy, existingRecord);
+        if (recordMutation != null) {
+          mutationCopy.recordMutations.remove(recordMutation);
+          doUpdateRecordMutation(recordMutation, existingRecord, newRow);
+        } else {
+          // Copy existing records over to the new row unmodified if there
+          // is no matching mutation.
+          newRow.addToRecords(existingRecord);
+        }
+      }
+
+      // Examine all remaining record mutations. For any record replacements
+      // we need to create a new record in the table even though an existing
+      // record did not match. Record deletions are also ok here since the
+      // record is effectively already deleted. Other record mutations are
+      // an error and should generate an exception.
+      for (RecordMutation recordMutation : mutationCopy.recordMutations) {
+        RecordMutationType type = recordMutation.recordMutationType;
+        switch (type) {
+        case DELETE_ENTIRE_RECORD:
+          // do nothing as missing record is already in desired state
+          break;
+        case APPEND_COLUMN_VALUES:
+          throw new BException("Mutation cannot append column values to non-existent record", recordMutation);
+        case REPLACE_ENTIRE_RECORD:
+          newRow.addToRecords(recordMutation.record);
+          break;
+        case REPLACE_COLUMNS:
+          throw new BException("Mutation cannot replace columns in non-existent record", recordMutation);
+        default:
+          throw new RuntimeException("Unsupported record mutation type [" + type + "]");
+        }
+      }
+
+      // Finally, replace the existing row with the new row we have built.
+      blurIndex.replaceRow(mutation.waitToBeVisible, mutation.wal, newRow);
+    } else {
+      throw new BException("Mutation cannot update row that does not exist.", mutation);
+    }
+  }
+
+  private static void doUpdateRecordMutation(RecordMutation recordMutation, Record existingRecord, Row newRow) {
+    Record mutationRecord = recordMutation.record;
+    switch (recordMutation.recordMutationType) {
+    case DELETE_ENTIRE_RECORD:
+      return;
+    case APPEND_COLUMN_VALUES:
+      for (Column column : mutationRecord.columns) {
+        existingRecord.addToColumns(column);
+      }
+      newRow.addToRecords(existingRecord);
+      break;
+    case REPLACE_ENTIRE_RECORD:
+      newRow.addToRecords(mutationRecord);
+      break;
+    case REPLACE_COLUMNS:
+      Set<String> columnNames = new HashSet<String>();
+      for (Column column : mutationRecord.columns) {
+        columnNames.add(column.name);
+      }
+
+      LOOP: for (Column column : existingRecord.columns) {
+        // skip columns in existing record that are contained in the mutation
+        // record
+        if (columnNames.contains(column.name)) {
+          continue LOOP;
+        }
+        mutationRecord.addToColumns(column);
+      }
+      newRow.addToRecords(mutationRecord);
+      break;
+    default:
+      break;
+    }
+  }
+
+  // private boolean isSameRecord(Record existingRecord, Record mutationRecord)
+  // {
+  // if (existingRecord.recordId.equals(mutationRecord.recordId)) {
+  // if (existingRecord.family.equals(mutationRecord.family)) {
+  // return true;
+  // }
+  // }
+  // return false;
+  // }
+
+  private int getNumberOfShards(String table) {
+    return _indexServer.getShardCount(table);
+  }
+
+  public static class SimpleQueryParallelCall implements ParallelCall<Entry<String, BlurIndex>, BlurResultIterable> {
+
+    private String _table;
+    private QueryStatus _status;
+    private IndexServer _indexServer;
+    private Query _query;
+    private Selector _selector;
+    private BlurMetrics _blurMetrics;
+    private AtomicBoolean _running;
+
+    public SimpleQueryParallelCall(AtomicBoolean running, String table, QueryStatus status, IndexServer indexServer, Query query, Selector selector, BlurMetrics blurMetrics) {
+      _running = running;
+      _table = table;
+      _status = status;
+      _indexServer = indexServer;
+      _query = query;
+      _selector = selector;
+      _blurMetrics = blurMetrics;
+    }
+
+    @Override
+    public BlurResultIterable call(Entry<String, BlurIndex> entry) throws Exception {
+      _status.attachThread();
+      try {
+        BlurIndex index = entry.getValue();
+        IndexReader reader = index.getIndexReader();
+        String shard = entry.getKey();
+        IndexReader escapeReader = EscapeRewrite.wrap(reader, _running);
+        IndexSearcher searcher = new IndexSearcher(escapeReader);
+        searcher.setSimilarity(_indexServer.getSimilarity(_table));
+        Query rewrite = searcher.rewrite((Query) _query.clone());
+        return new BlurResultIterableSearcher(_running, rewrite, _table, shard, searcher, _selector, reader);
+      } finally {
+        _blurMetrics.queriesInternal.incrementAndGet();
+        _status.deattachThread();
+      }
+    }
+  }
+
+  public void setThreadCount(int threadCount) {
+    this._threadCount = threadCount;
+  }
+
+  public void setBlurMetrics(BlurMetrics blurMetrics) {
+    _blurMetrics = blurMetrics;
+  }
+
+  public void setFilterCache(BlurFilterCache filterCache) {
+    _filterCache = filterCache;
+  }
+
+  public void optimize(String table, int numberOfSegmentsPerShard) throws BException {
+    Map<String, BlurIndex> blurIndexes;
+    try {
+      blurIndexes = _indexServer.getIndexes(table);
+    } catch (IOException e) {
+      LOG.error("Unknown error while trying to fetch index readers.", e);
+      throw new BException(e.getMessage(), e);
+    }
+
+    Collection<BlurIndex> values = blurIndexes.values();
+    for (BlurIndex index : values) {
+      try {
+        index.optimize(numberOfSegmentsPerShard);
+      } catch (IOException e) {
+        LOG.error("Unknown error while trying to optimize indexes.", e);
+        throw new BException(e.getMessage(), e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java b/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
new file mode 100644
index 0000000..2a3f937
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
@@ -0,0 +1,177 @@
+package org.apache.blur.manager;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.lucene.search.Similarity;
+
+
+public interface IndexServer {
+
+  public enum TABLE_STATUS {
+    ENABLED, DISABLED
+  }
+
+  // Server state
+
+  /**
+   * Gets a sorted list of shards being served by this server.
+   * 
+   * @param table
+   *          the table name
+   * @return the sorted list of shards.
+   */
+  SortedSet<String> getShardListCurrentServerOnly(String table) throws IOException;
+
+  /**
+   * Gets a map of the index readers for current running node.
+   * <p/>
+   * Keys are shard names, values are the associated indexes.
+   * 
+   * @param table
+   *          the table name.
+   * @return the map of readers.
+   * @throws IOException
+   */
+  Map<String, BlurIndex> getIndexes(String table) throws IOException;
+
+  // Table Meta Data
+
+  /**
+   * The shard list for a given table.
+   * 
+   * @param table
+   *          the table name.
+   * @return the list of shards.
+   */
+  List<String> getShardList(String table);
+
+  /**
+   * Gets the similarity object used by lucene for this table.
+   * 
+   * @param table
+   *          the table name.
+   * @return the similarity object.
+   */
+  Similarity getSimilarity(String table);
+
+  /**
+   * Gets the status of the table.
+   * 
+   * @param table
+   *          the table name.
+   * @return the status.
+   */
+  TABLE_STATUS getTableStatus(String table);
+
+  /**
+   * Gets the analyzer for the table.
+   * 
+   * @param table
+   *          the table name.
+   * @return the analyzer for lucene.
+   */
+  BlurAnalyzer getAnalyzer(String table);
+
+  /**
+   * Gets the current nodes name.
+   * 
+   * @return
+   */
+  String getNodeName();
+
+  /**
+   * Gets the table uri. (hdfs://cluster1:9000/blur/tables/tablename1234)
+   * 
+   * @param table
+   *          the table name
+   * @return the uri to the table directory that contains all the shards..
+   */
+  String getTableUri(String table);
+
+  /**
+   * Gets the shard count for the given table.
+   * 
+   * @param table
+   *          the name of the table.
+   * @return
+   */
+  int getShardCount(String table);
+
+  /**
+   * Gets the compress codec for the given table.
+   * 
+   * @param table
+   *          the name of the table.
+   * @return the {@link CompressionCodec}
+   */
+  CompressionCodec getCompressionCodec(String table);
+
+  /**
+   * Get the compression block size.
+   * 
+   * @param table
+   *          the name of the table.
+   * @return the block size.
+   */
+  int getCompressionBlockSize(String table);
+
+  // Metrics
+
+  /**
+   * Gets the record count of the table.
+   * 
+   * @param table
+   *          the name of the table.
+   * @return the record count.
+   * @throws IOException
+   */
+  long getRecordCount(String table) throws IOException;
+
+  /**
+   * Gets the row count of the table.
+   * 
+   * @param table
+   *          the name of the table.
+   * @return
+   * @throws IOException
+   */
+  long getRowCount(String table) throws IOException;
+
+  /**
+   * Gets the current on disk table size.
+   * 
+   * @param table
+   *          the name of the table.
+   * @return the number of bytes on disk.
+   * @throws IOException
+   */
+  long getTableSize(String table) throws IOException;
+
+  /**
+   * Closes the index server.
+   */
+  void close();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/QueryParserUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/QueryParserUtil.java b/src/blur-core/src/main/java/org/apache/blur/manager/QueryParserUtil.java
new file mode 100644
index 0000000..dc712be
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/QueryParserUtil.java
@@ -0,0 +1,65 @@
+package org.apache.blur.manager;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.lucene.LuceneConstant.LUCENE_VERSION;
+
+import org.apache.blur.lucene.search.SuperParser;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.ScoreType;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.queryParser.ParseException;
+import org.apache.lucene.search.Filter;
+import org.apache.lucene.search.FilteredQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.QueryWrapperFilter;
+
+
+public class QueryParserUtil {
+
+  public static Query parseQuery(String query, boolean superQueryOn, Analyzer analyzer, Filter postFilter, Filter preFilter, ScoreType scoreType) throws ParseException {
+    Query result = new SuperParser(LUCENE_VERSION, analyzer, superQueryOn, preFilter, scoreType).parse(query);
+    if (postFilter == null) {
+      return result;
+    }
+    return new FilteredQuery(result, postFilter);
+  }
+
+  public static Filter parseFilter(String table, String filterStr, boolean superQueryOn, Analyzer analyzer, BlurFilterCache filterCache) throws ParseException, BlurException {
+    if (filterStr == null) {
+      return null;
+    }
+    synchronized (filterCache) {
+      Filter filter;
+      if (superQueryOn) {
+        filter = filterCache.fetchPostFilter(table, filterStr);
+      } else {
+        filter = filterCache.fetchPreFilter(table, filterStr);
+      }
+      if (filter != null) {
+        return filter;
+      }
+      filter = new QueryWrapperFilter(new SuperParser(LUCENE_VERSION, analyzer, superQueryOn, null, ScoreType.CONSTANT).parse(filterStr));
+      if (superQueryOn) {
+        filter = filterCache.storePostFilter(table, filterStr, filter);
+      } else {
+        filter = filterCache.storePreFilter(table, filterStr, filter);
+      }
+      return filter;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ClusterStatus.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ClusterStatus.java b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ClusterStatus.java
new file mode 100644
index 0000000..f14ef11
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ClusterStatus.java
@@ -0,0 +1,80 @@
+package org.apache.blur.manager.clusterstatus;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.blur.thrift.generated.TableDescriptor;
+
+
+public abstract class ClusterStatus {
+
+  public abstract List<String> getOnlineShardServers(boolean useCache, String cluster);
+
+  public abstract List<String> getControllerServerList();
+
+  public abstract List<String> getShardServerList(String cluster);
+
+  public abstract List<String> getClusterList(boolean useCache);
+
+  public abstract TableDescriptor getTableDescriptor(boolean useCache, String cluster, String table);
+
+  public final List<String> getTableList(boolean useCache) {
+    List<String> tables = new ArrayList<String>();
+    for (String cluster : getClusterList(useCache)) {
+      tables.addAll(getTableList(useCache, cluster));
+    }
+    return tables;
+  }
+
+  public abstract String getCluster(boolean useCache, String table);
+
+  public abstract boolean isEnabled(boolean useCache, String cluster, String table);
+
+  public abstract boolean exists(boolean useCache, String cluster, String table);
+
+  public abstract boolean isInSafeMode(boolean useCache, String cluster);
+
+  public List<String> getOfflineShardServers(boolean useCache, String cluster) {
+    List<String> shardServerList = new ArrayList<String>(getShardServerList(cluster));
+    shardServerList.removeAll(getOnlineShardServers(useCache, cluster));
+    return shardServerList;
+  }
+
+  public abstract int getShardCount(boolean useCache, String cluster, String table);
+
+  public abstract boolean isBlockCacheEnabled(String cluster, String table);
+
+  public abstract Set<String> getBlockCacheFileTypes(String cluster, String table);
+
+  public abstract List<String> getTableList(boolean useCache, String cluster);
+
+  public abstract boolean isReadOnly(boolean useCache, String cluster, String table);
+
+  public abstract void createTable(TableDescriptor tableDescriptor);
+
+  public abstract void disableTable(String cluster, String table);
+
+  public abstract void enableTable(String cluster, String table);
+
+  public abstract void removeTable(String cluster, String table, boolean deleteIndexFiles);
+
+  public abstract boolean isOpen();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
new file mode 100644
index 0000000..81ea5f4
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
@@ -0,0 +1,823 @@
+package org.apache.blur.manager.clusterstatus;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.FairSimilarity;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.ColumnPreCache;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.zookeeper.WatchChildren;
+import org.apache.blur.zookeeper.WatchNodeData;
+import org.apache.blur.zookeeper.WatchNodeExistance;
+import org.apache.blur.zookeeper.ZkUtils;
+import org.apache.blur.zookeeper.WatchChildren.OnChange;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.lucene.search.Similarity;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+
+public class ZookeeperClusterStatus extends ClusterStatus {
+
+  private static final Log LOG = LogFactory.getLog(ZookeeperClusterStatus.class);
+
+  private ZooKeeper _zk;
+  private AtomicBoolean _running = new AtomicBoolean();
+  private ConcurrentMap<String, Long> _safeModeMap = new ConcurrentHashMap<String, Long>();
+  private ConcurrentMap<String, List<String>> _onlineShardsNodes = new ConcurrentHashMap<String, List<String>>();
+  private ConcurrentMap<String, Set<String>> _tablesPerCluster = new ConcurrentHashMap<String, Set<String>>();
+  private AtomicReference<Set<String>> _clusters = new AtomicReference<Set<String>>(new HashSet<String>());
+  private ConcurrentMap<String, Boolean> _enabled = new ConcurrentHashMap<String, Boolean>();
+  private ConcurrentMap<String, Boolean> _readOnly = new ConcurrentHashMap<String, Boolean>();
+
+  private WatchChildren _clusterWatcher;
+  private ConcurrentMap<String, WatchChildren> _onlineShardsNodesWatchers = new ConcurrentHashMap<String, WatchChildren>();
+  private ConcurrentMap<String, WatchChildren> _tableWatchers = new ConcurrentHashMap<String, WatchChildren>();
+  private ConcurrentMap<String, WatchNodeExistance> _safeModeWatchers = new ConcurrentHashMap<String, WatchNodeExistance>();
+  private ConcurrentMap<String, WatchNodeData> _safeModeDataWatchers = new ConcurrentHashMap<String, WatchNodeData>();
+  private ConcurrentMap<String, WatchNodeExistance> _enabledWatchNodeExistance = new ConcurrentHashMap<String, WatchNodeExistance>();
+  private ConcurrentMap<String, WatchNodeExistance> _readOnlyWatchNodeExistance = new ConcurrentHashMap<String, WatchNodeExistance>();
+
+  public ZookeeperClusterStatus(ZooKeeper zooKeeper) {
+    _zk = zooKeeper;
+    _running.set(true);
+    watchForClusters();
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  class Clusters extends OnChange {
+    @Override
+    public void action(List<String> clusters) {
+      _clusters.set(new HashSet<String>(clusters));
+      for (String cluster : clusters) {
+        if (!_tableWatchers.containsKey(cluster)) {
+          String tablesPath = ZookeeperPathConstants.getTablesPath(cluster);
+          ZkUtils.waitUntilExists(_zk, tablesPath);
+          WatchChildren clusterWatcher = new WatchChildren(_zk, tablesPath).watch(new Tables(cluster));
+          _tableWatchers.put(cluster, clusterWatcher);
+          String safemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
+          ZkUtils.waitUntilExists(_zk, safemodePath);
+          WatchNodeExistance watchNodeExistance = new WatchNodeExistance(_zk, safemodePath).watch(new SafeExistance(cluster));
+          _safeModeWatchers.put(cluster, watchNodeExistance);
+        }
+      }
+
+      List<String> clustersToCloseAndRemove = new ArrayList<String>(clusters);
+      clustersToCloseAndRemove.removeAll(_tableWatchers.keySet());
+      for (String cluster : clustersToCloseAndRemove) {
+        WatchChildren watcher = _tableWatchers.remove(cluster);
+        if (watcher == null) {
+          LOG.error("Error watcher is null [" + cluster + "] ");
+        } else {
+          watcher.close();
+        }
+      }
+    }
+  }
+
+  class SafeExistance extends WatchNodeExistance.OnChange {
+
+    private String cluster;
+
+    public SafeExistance(String cluster) {
+      this.cluster = cluster;
+    }
+
+    @Override
+    public void action(Stat stat) {
+      if (stat != null) {
+        WatchNodeData watchNodeData = new WatchNodeData(_zk, ZookeeperPathConstants.getSafemodePath(cluster));
+        watchNodeData.watch(new WatchNodeData.OnChange() {
+          @Override
+          public void action(byte[] data) {
+            if (data == null) {
+              LOG.debug("Safe mode value for cluster [" + cluster + "] is not set.");
+              _safeModeMap.put(cluster, Long.MIN_VALUE);
+            } else {
+              String value = new String(data);
+              LOG.debug("Safe mode value for cluster [" + cluster + "] is [" + value + "].");
+              _safeModeMap.put(cluster, Long.parseLong(value));
+            }
+          }
+        });
+        WatchNodeData nodeData = _safeModeDataWatchers.put(cluster, watchNodeData);
+        if (nodeData != null) {
+          nodeData.close();
+        }
+      }
+    }
+  }
+
+  class Tables extends OnChange {
+    private String cluster;
+
+    public Tables(String cluster) {
+      this.cluster = cluster;
+    }
+
+    @Override
+    public void action(List<String> tables) {
+      Set<String> newSet = new HashSet<String>(tables);
+      Set<String> oldSet = _tablesPerCluster.put(cluster, newSet);
+      Set<String> newTables = getNewTables(newSet, oldSet);
+      for (String table : newTables) {
+        final String clusterTableKey = getClusterTableKey(cluster, table);
+
+        WatchNodeExistance readOnlyWatcher = new WatchNodeExistance(_zk, ZookeeperPathConstants.getTableReadOnlyPath(cluster, table));
+        readOnlyWatcher.watch(new WatchNodeExistance.OnChange() {
+          @Override
+          public void action(Stat stat) {
+            if (stat == null) {
+              _readOnly.put(clusterTableKey, Boolean.FALSE);
+            } else {
+              _readOnly.put(clusterTableKey, Boolean.TRUE);
+            }
+          }
+        });
+        if (_readOnlyWatchNodeExistance.putIfAbsent(clusterTableKey, readOnlyWatcher) != null) {
+          readOnlyWatcher.close();
+        }
+
+        WatchNodeExistance enabledWatcher = new WatchNodeExistance(_zk, ZookeeperPathConstants.getTableEnabledPath(cluster, table));
+        enabledWatcher.watch(new WatchNodeExistance.OnChange() {
+          @Override
+          public void action(Stat stat) {
+            if (stat == null) {
+              _enabled.put(clusterTableKey, Boolean.FALSE);
+            } else {
+              _enabled.put(clusterTableKey, Boolean.TRUE);
+            }
+          }
+        });
+        if (_enabledWatchNodeExistance.putIfAbsent(clusterTableKey, enabledWatcher) != null) {
+          enabledWatcher.close();
+        }
+      }
+    }
+
+    private Set<String> getNewTables(Set<String> newSet, Set<String> oldSet) {
+      Set<String> newTables = new HashSet<String>(newSet);
+      if (oldSet != null) {
+        newTables.removeAll(oldSet);
+      }
+      return newTables;
+    }
+  }
+
+  private void watchForClusters() {
+    _clusterWatcher = new WatchChildren(_zk, ZookeeperPathConstants.getClustersPath()).watch(new Clusters());
+  }
+
+  public ZookeeperClusterStatus(String connectionStr) throws IOException {
+    this(new ZooKeeper(connectionStr, 30000, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+
+      }
+    }));
+  }
+
+  private String getClusterTableKey(String cluster, String table) {
+    return cluster + "." + table;
+  }
+
+  @Override
+  public List<String> getClusterList(boolean useCache) {
+    if (useCache) {
+      return new ArrayList<String>(_clusters.get());
+    }
+    long s = System.nanoTime();
+    try {
+      checkIfOpen();
+      return _zk.getChildren(ZookeeperPathConstants.getClustersPath(), false);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace getClusterList [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  private void checkIfOpen() {
+    if (_running.get()) {
+      return;
+    }
+    throw new RuntimeException("not open");
+  }
+
+  @Override
+  public List<String> getControllerServerList() {
+    long s = System.nanoTime();
+    try {
+      checkIfOpen();
+      return _zk.getChildren(ZookeeperPathConstants.getOnlineControllersPath(), false);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace getControllerServerList [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  @Override
+  public List<String> getOnlineShardServers(boolean useCache, String cluster) {
+    if (useCache) {
+      List<String> shards = _onlineShardsNodes.get(cluster);
+      if (shards != null) {
+        return shards;
+      } else {
+        watchForOnlineShardNodes(cluster);
+      }
+    }
+
+    long s = System.nanoTime();
+    try {
+      checkIfOpen();
+      return _zk.getChildren(ZookeeperPathConstants.getClustersPath() + "/" + cluster + "/online/shard-nodes", false);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace getOnlineShardServers took [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  private void watchForOnlineShardNodes(final String cluster) {
+    WatchChildren watch = new WatchChildren(_zk, ZookeeperPathConstants.getOnlineShardsPath(cluster)).watch(new OnChange() {
+      @Override
+      public void action(List<String> children) {
+        _onlineShardsNodes.put(cluster, children);
+      }
+    });
+    if (_onlineShardsNodesWatchers.putIfAbsent(cluster, watch) != null) {
+      // There was already a watch created. Close the extra watcher.
+      watch.close();
+    }
+  }
+
+  @Override
+  public List<String> getShardServerList(String cluster) {
+    long s = System.nanoTime();
+    try {
+      checkIfOpen();
+      return _zk.getChildren(ZookeeperPathConstants.getClustersPath() + "/" + cluster + "/shard-nodes", false);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace getShardServerList took [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  @Override
+  public boolean exists(boolean useCache, String cluster, String table) {
+    if (useCache) {
+      Set<String> tables = _tablesPerCluster.get(cluster);
+      if (tables != null) {
+        if (tables.contains(table)) {
+          return true;
+        }
+      }
+    }
+    long s = System.nanoTime();
+    try {
+      checkIfOpen();
+      if (_zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false) == null) {
+        return false;
+      }
+      return true;
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace exists took [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  @Override
+  public boolean isEnabled(boolean useCache, String cluster, String table) {
+    if (useCache) {
+      Boolean e = _enabled.get(getClusterTableKey(cluster, table));
+      if (e != null) {
+        return e;
+      }
+    }
+    long s = System.nanoTime();
+    String tablePathIsEnabled = ZookeeperPathConstants.getTableEnabledPath(cluster, table);
+    try {
+      checkIfOpen();
+      if (_zk.exists(tablePathIsEnabled, false) == null) {
+        return false;
+      }
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace isEnabled took [" + (e - s) / 1000000.0 + " ms]");
+    }
+    return true;
+  }
+
+  private Map<String, TableDescriptor> _tableDescriptorCache = new ConcurrentHashMap<String, TableDescriptor>();
+
+  @Override
+  public TableDescriptor getTableDescriptor(boolean useCache, String cluster, String table) {
+    if (useCache) {
+      TableDescriptor tableDescriptor = _tableDescriptorCache.get(table);
+      updateReadOnlyAndEnabled(useCache, tableDescriptor, cluster, table);
+      if (tableDescriptor != null) {
+        return tableDescriptor;
+      }
+    }
+    long s = System.nanoTime();
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    try {
+      checkIfOpen();
+      tableDescriptor.shardCount = Integer.parseInt(new String(getData(ZookeeperPathConstants.getTableShardCountPath(cluster, table))));
+      tableDescriptor.tableUri = new String(getData(ZookeeperPathConstants.getTableUriPath(cluster, table)));
+      tableDescriptor.compressionClass = new String(getData(ZookeeperPathConstants.getTableCompressionCodecPath(cluster, table)));
+      tableDescriptor.compressionBlockSize = Integer.parseInt(new String(getData(ZookeeperPathConstants.getTableCompressionBlockSizePath(cluster, table))));
+      tableDescriptor.analyzerDefinition = fromBytes(getData(ZookeeperPathConstants.getTablePath(cluster, table)), AnalyzerDefinition.class);
+      tableDescriptor.blockCaching = isBlockCacheEnabled(cluster, table);
+      tableDescriptor.blockCachingFileTypes = getBlockCacheFileTypes(cluster, table);
+      tableDescriptor.name = table;
+      tableDescriptor.columnPreCache = fromBytes(getData(ZookeeperPathConstants.getTableColumnsToPreCache(cluster, table)), ColumnPreCache.class);
+      byte[] data = getData(ZookeeperPathConstants.getTableSimilarityPath(cluster, table));
+      if (data != null) {
+        tableDescriptor.similarityClass = new String(data);
+      }
+      updateReadOnlyAndEnabled(useCache, tableDescriptor, cluster, table);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace getTableDescriptor took [" + (e - s) / 1000000.0 + " ms]");
+    }
+    tableDescriptor.cluster = cluster;
+    _tableDescriptorCache.put(table, tableDescriptor);
+    return tableDescriptor;
+  }
+
+  private void updateReadOnlyAndEnabled(boolean useCache, TableDescriptor tableDescriptor, String cluster, String table) {
+    if (tableDescriptor != null) {
+      tableDescriptor.setReadOnly(isReadOnly(useCache, cluster, table));
+      tableDescriptor.setIsEnabled(isEnabled(useCache, cluster, table));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T extends TBase<?, ?>> T fromBytes(byte[] data, Class<T> clazz) {
+    try {
+      if (data == null) {
+        return null;
+      }
+      TBase<?, ?> base = clazz.newInstance();
+      TMemoryInputTransport trans = new TMemoryInputTransport(data);
+      TJSONProtocol protocol = new TJSONProtocol(trans);
+      base.read(protocol);
+      trans.close();
+      return (T) base;
+    } catch (InstantiationException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    } catch (TException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private byte[] getData(String path) throws KeeperException, InterruptedException {
+    Stat stat = _zk.exists(path, false);
+    if (stat == null) {
+      return null;
+    }
+    return _zk.getData(path, false, stat);
+  }
+
+  @Override
+  public List<String> getTableList(boolean useCache, String cluster) {
+    if (useCache) {
+      Set<String> tables = _tablesPerCluster.get(cluster);
+      if (tables != null) {
+        return new ArrayList<String>(tables);
+      }
+    }
+    long s = System.nanoTime();
+    try {
+      checkIfOpen();
+      return _zk.getChildren(ZookeeperPathConstants.getTablesPath(cluster), false);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace getTableList took [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  public void close() {
+    if (_running.get()) {
+      _running.set(false);
+      close(_clusterWatcher);
+      close(_onlineShardsNodesWatchers);
+      close(_tableWatchers);
+      close(_safeModeWatchers);
+      close(_safeModeDataWatchers);
+      close(_enabledWatchNodeExistance);
+      close(_readOnlyWatchNodeExistance);
+    }
+  }
+
+  private void close(ConcurrentMap<String, ? extends Closeable> closableMap) {
+    Collection<? extends Closeable> values = closableMap.values();
+    for (Closeable closeable : values) {
+      close(closeable);
+    }
+  }
+
+  private void close(Closeable closeable) {
+    try {
+      closeable.close();
+    } catch (IOException e) {
+      LOG.error("Unknown error while trying to close [{0}]", closeable);
+    }
+  }
+
+  @Override
+  public String getCluster(boolean useCache, String table) {
+    if (useCache) {
+      for (Entry<String, Set<String>> entry : _tablesPerCluster.entrySet()) {
+        if (entry.getValue().contains(table)) {
+          return entry.getKey();
+        }
+      }
+    }
+    List<String> clusterList = getClusterList(useCache);
+    for (String cluster : clusterList) {
+      long s = System.nanoTime();
+      try {
+        checkIfOpen();
+        Stat stat = _zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false);
+        if (stat != null) {
+          // _tableToClusterCache.put(table, cluster);
+          return cluster;
+        }
+      } catch (KeeperException e) {
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      } finally {
+        long e = System.nanoTime();
+        LOG.debug("trace getCluster took [" + (e - s) / 1000000.0 + " ms]");
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean isInSafeMode(boolean useCache, String cluster) {
+    if (useCache) {
+      Long safeModeTimestamp = _safeModeMap.get(cluster);
+      if (safeModeTimestamp != null && safeModeTimestamp != Long.MIN_VALUE) {
+        return safeModeTimestamp < System.currentTimeMillis() ? false : true;
+      }
+    }
+    long s = System.nanoTime();
+    try {
+      checkIfOpen();
+      String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
+      Stat stat = _zk.exists(blurSafemodePath, false);
+      if (stat == null) {
+        return false;
+      }
+      byte[] data = _zk.getData(blurSafemodePath, false, stat);
+      if (data == null) {
+        return false;
+      }
+      long timestamp = Long.parseLong(new String(data));
+      long waitTime = timestamp - System.currentTimeMillis();
+      if (waitTime > 0) {
+        return true;
+      }
+      return false;
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace isInSafeMode took [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  @Override
+  public int getShardCount(boolean useCache, String cluster, String table) {
+    if (useCache) {
+      TableDescriptor tableDescriptor = getTableDescriptor(true, cluster, table);
+      return tableDescriptor.shardCount;
+    }
+    long s = System.nanoTime();
+    try {
+      return Integer.parseInt(new String(getData(ZookeeperPathConstants.getTableShardCountPath(cluster, table))));
+    } catch (NumberFormatException e) {
+      throw new RuntimeException(e);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace getShardCount took [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  @Override
+  public Set<String> getBlockCacheFileTypes(String cluster, String table) {
+    long s = System.nanoTime();
+    try {
+      byte[] data = getData(ZookeeperPathConstants.getTableBlockCachingFileTypesPath(cluster, table));
+      if (data == null) {
+        return null;
+      }
+      String str = new String(data);
+      if (str.isEmpty()) {
+        return null;
+      }
+      Set<String> types = new HashSet<String>(Arrays.asList(str.split(",")));
+      if (types.isEmpty()) {
+        return null;
+      }
+      return types;
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace getBlockCacheFileTypes took [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  @Override
+  public boolean isBlockCacheEnabled(String cluster, String table) {
+    long s = System.nanoTime();
+    try {
+      checkIfOpen();
+      if (_zk.exists(ZookeeperPathConstants.getTableBlockCachingFileTypesPath(cluster, table), false) == null) {
+        return false;
+      }
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace isBlockCacheEnabled took [" + (e - s) / 1000000.0 + " ms]");
+    }
+    return true;
+  }
+
+  @Override
+  public boolean isReadOnly(boolean useCache, String cluster, String table) {
+    if (useCache) {
+      Boolean ro = _readOnly.get(getClusterTableKey(cluster, table));
+      if (ro != null) {
+        return ro;
+      }
+    }
+    long s = System.nanoTime();
+    String path = ZookeeperPathConstants.getTableReadOnlyPath(cluster, table);
+    try {
+      checkIfOpen();
+      if (_zk.exists(path, false) == null) {
+        return false;
+      }
+      return true;
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace isReadOnly took [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  @Override
+  public void createTable(TableDescriptor tableDescriptor) {
+    long s = System.nanoTime();
+    try {
+      checkIfOpen();
+      if (tableDescriptor.getCompressionClass() == null) {
+        tableDescriptor.setCompressionClass(DeflateCodec.class.getName());
+      }
+      if (tableDescriptor.getSimilarityClass() == null) {
+        tableDescriptor.setSimilarityClass(FairSimilarity.class.getName());
+      }
+      if (tableDescriptor.getAnalyzerDefinition() == null) {
+        tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+      }
+      String table = BlurUtil.nullCheck(tableDescriptor.name, "tableDescriptor.name cannot be null.");
+      String cluster = BlurUtil.nullCheck(tableDescriptor.cluster, "tableDescriptor.cluster cannot be null.");
+      BlurAnalyzer analyzer = new BlurAnalyzer(BlurUtil.nullCheck(tableDescriptor.analyzerDefinition, "tableDescriptor.analyzerDefinition cannot be null."));
+      String uri = BlurUtil.nullCheck(tableDescriptor.tableUri, "tableDescriptor.tableUri cannot be null.");
+      int shardCount = BlurUtil.zeroCheck(tableDescriptor.shardCount, "tableDescriptor.shardCount cannot be less than 1");
+      CompressionCodec compressionCodec = BlurUtil.getInstance(tableDescriptor.compressionClass, CompressionCodec.class);
+      // @TODO check block size
+      int compressionBlockSize = tableDescriptor.compressionBlockSize;
+      Similarity similarity = BlurUtil.getInstance(tableDescriptor.similarityClass, Similarity.class);
+      boolean blockCaching = tableDescriptor.blockCaching;
+      Set<String> blockCachingFileTypes = tableDescriptor.blockCachingFileTypes;
+      String blurTablePath = ZookeeperPathConstants.getTablePath(cluster, table);
+      ColumnPreCache columnPreCache = tableDescriptor.columnPreCache;
+
+      if (_zk.exists(blurTablePath, false) != null) {
+        throw new IOException("Table [" + table + "] already exists.");
+      }
+      BlurUtil.setupFileSystem(uri, shardCount);
+      BlurUtil.createPath(_zk, blurTablePath, analyzer.toJSON().getBytes());
+      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableColumnsToPreCache(cluster, table), BlurUtil.read(columnPreCache));
+      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableUriPath(cluster, table), uri.getBytes());
+      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableShardCountPath(cluster, table), Integer.toString(shardCount).getBytes());
+      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableCompressionCodecPath(cluster, table), compressionCodec.getClass().getName().getBytes());
+      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableCompressionBlockSizePath(cluster, table), Integer.toString(compressionBlockSize).getBytes());
+      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableSimilarityPath(cluster, table), similarity.getClass().getName().getBytes());
+      BlurUtil.createPath(_zk, ZookeeperPathConstants.getLockPath(cluster, table), null);
+      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableFieldNamesPath(cluster, table), null);
+      if (tableDescriptor.readOnly) {
+        BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableReadOnlyPath(cluster, table), null);
+      }
+      if (blockCaching) {
+        BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableBlockCachingPath(cluster, table), null);
+      }
+      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableBlockCachingFileTypesPath(cluster, table), toBytes(blockCachingFileTypes));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace createTable took [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  @Override
+  public void disableTable(String cluster, String table) {
+    long s = System.nanoTime();
+    try {
+      checkIfOpen();
+      if (_zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false) == null) {
+        throw new IOException("Table [" + table + "] does not exist.");
+      }
+      String blurTableEnabledPath = ZookeeperPathConstants.getTableEnabledPath(cluster, table);
+      if (_zk.exists(blurTableEnabledPath, false) == null) {
+        throw new IOException("Table [" + table + "] already disabled.");
+      }
+      _zk.delete(blurTableEnabledPath, -1);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace disableTable took [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  @Override
+  public void enableTable(String cluster, String table) {
+    long s = System.nanoTime();
+    try {
+      checkIfOpen();
+      if (_zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false) == null) {
+        throw new IOException("Table [" + table + "] does not exist.");
+      }
+      String blurTableEnabledPath = ZookeeperPathConstants.getTableEnabledPath(cluster, table);
+      if (_zk.exists(blurTableEnabledPath, false) != null) {
+        throw new IOException("Table [" + table + "] already enabled.");
+      }
+      _zk.create(blurTableEnabledPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace enableTable took [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  @Override
+  public void removeTable(String cluster, String table, boolean deleteIndexFiles) {
+    long s = System.nanoTime();
+    try {
+      checkIfOpen();
+      String blurTablePath = ZookeeperPathConstants.getTablePath(cluster, table);
+      if (_zk.exists(blurTablePath, false) == null) {
+        throw new IOException("Table [" + table + "] does not exist.");
+      }
+      if (_zk.exists(ZookeeperPathConstants.getTableEnabledPath(cluster, table), false) != null) {
+        throw new IOException("Table [" + table + "] must be disabled before it can be removed.");
+      }
+      byte[] data = getData(ZookeeperPathConstants.getTableUriPath(cluster, table));
+      String uri = new String(data);
+      BlurUtil.removeAll(_zk, blurTablePath);
+      if (deleteIndexFiles) {
+        BlurUtil.removeIndexFiles(uri);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("trace removeTable took [" + (e - s) / 1000000.0 + " ms]");
+    }
+  }
+
+  private static byte[] toBytes(Set<String> blockCachingFileTypes) {
+    if (blockCachingFileTypes == null || blockCachingFileTypes.isEmpty()) {
+      return null;
+    }
+    StringBuilder builder = new StringBuilder();
+    for (String type : blockCachingFileTypes) {
+      builder.append(type).append(',');
+    }
+    return builder.substring(0, builder.length() - 1).getBytes();
+  }
+
+  @Override
+  public boolean isOpen() {
+    return _running.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
new file mode 100644
index 0000000..cb9eeed
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
@@ -0,0 +1,114 @@
+package org.apache.blur.manager.clusterstatus;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ZookeeperPathConstants {
+
+  public static String getBasePath() {
+    return "/blur";
+  }
+
+  public static String getClusterPath(String cluster) {
+    return getClustersPath() + "/" + cluster;
+  }
+
+  public static String getClustersPath() {
+    return getBasePath() + "/clusters";
+  }
+
+  public static String getOnlineControllersPath() {
+    return getBasePath() + "/online-controller-nodes";
+  }
+
+  public static String getTableEnabledPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/enabled";
+  }
+
+  public static String getTableUriPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/uri";
+  }
+
+  public static String getTableShardCountPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/shard-count";
+  }
+
+  public static String getOnlinePath(String cluster) {
+    return getClusterPath(cluster) + "/online";
+  }
+
+  public static String getOnlineShardsPath(String cluster) {
+    return getOnlinePath(cluster) + "/shard-nodes";
+  }
+
+  public static String getTablesPath(String cluster) {
+    return getClusterPath(cluster) + "/tables";
+  }
+
+  public static String getTablePath(String cluster, String table) {
+    return getTablesPath(cluster) + "/" + table;
+  }
+
+  public static String getSafemodePath(String cluster) {
+    return getClusterPath(cluster) + "/safemode";
+  }
+
+  public static String getRegisteredShardsPath(String cluster) {
+    return getClusterPath(cluster) + "/shard-nodes";
+  }
+
+  public static String getTableCompressionCodecPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/compression-codec";
+  }
+
+  public static String getTableCompressionBlockSizePath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/compression-blocksize";
+  }
+
+  public static String getLockPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/locks";
+  }
+
+  public static String getTableBlockCachingFileTypesPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/blockcachingfiletypes";
+  }
+
+  public static String getTableBlockCachingPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/blockcaching";
+  }
+
+  public static String getTableSimilarityPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/similarity";
+  }
+
+  public static String getTableFieldNamesPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/fieldnames";
+  }
+
+  public static String getTableFieldNamesPath(String cluster, String table, String fieldName) {
+    return getTableFieldNamesPath(cluster, table) + "/" + fieldName;
+  }
+
+  public static String getTableReadOnlyPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/readonly";
+  }
+
+  public static String getTableColumnsToPreCache(String cluster, String table) {
+    return getTablePath(cluster, table) + "/precache";
+  }
+
+}


Mime
View raw message