incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/3] git commit: Added type checking so that the same named field can not have two different types.
Date Fri, 18 Jan 2013 02:31:18 GMT
Added type checking so that the same named field can not have two different types.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/b2924318
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/b2924318
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/b2924318

Branch: refs/heads/0.2-dev
Commit: b2924318f960ae181997b54740393eadd21cf9c5
Parents: 4a8c377
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Jan 17 17:12:06 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Jan 17 17:12:06 2013 -0500

----------------------------------------------------------------------
 .../blur/lucene/search/QueryConverterImpl.java     |    4 +-
 .../java/org/apache/blur/manager/IndexServer.java  |    2 +-
 .../indexserver/DistributedIndexServer.java        |    7 +-
 .../blur/manager/writer/AbstractBlurIndex.java     |    4 +-
 .../blur/manager/writer/BlurIndexReader.java       |    2 +-
 .../apache/blur/manager/writer/BlurNRTIndex.java   |    4 +-
 .../blur/manager/writer/TransactionRecorder.java   |   12 +-
 .../java/org/apache/blur/server/BlurServer.java    |  607 +++++++++++++++
 .../java/org/apache/blur/server/Configurable.java  |   26 +
 .../java/org/apache/blur/server/Configured.java    |   26 +
 .../org/apache/blur/server/SearchCallable.java     |   66 ++
 .../java/org/apache/blur/server/SessionInfo.java   |  100 +++
 .../java/org/apache/blur/server/ShardContext.java  |   71 ++
 .../java/org/apache/blur/server/TableAdmin.java    |  240 ++++++
 .../java/org/apache/blur/server/TableContext.java  |  204 +++++
 .../java/org/apache/blur/server/TableLayout.java   |   11 +
 .../java/org/apache/blur/server/TypeChecker.java   |   22 +
 .../apache/blur/server/ZooKeeperTypeChecker.java   |   40 +
 .../java/org/apache/blur/thrift/BlurServer.java    |  604 --------------
 .../java/org/apache/blur/thrift/Configurable.java  |   26 -
 .../java/org/apache/blur/thrift/Configured.java    |   26 -
 .../org/apache/blur/thrift/SearchCallable.java     |   66 --
 .../java/org/apache/blur/thrift/SessionInfo.java   |  100 ---
 .../java/org/apache/blur/thrift/ShardContext.java  |   71 --
 .../java/org/apache/blur/thrift/TableAdmin.java    |  239 ------
 .../java/org/apache/blur/thrift/TableContext.java  |  181 -----
 .../java/org/apache/blur/thrift/TableLayout.java   |   11 -
 .../org/apache/blur/thrift/ThriftBlurServer.java   |    2 +
 .../apache/blur/utils/ThriftLuceneConversion.java  |   20 +-
 .../blur/manager/writer/BlurNRTIndexTest.java      |    6 +-
 .../src/main/java/org/apache/blur/CachedMap.java   |   53 ++
 .../org/apache/blur/zookeeper/ZkCachedMap.java     |  193 +++++
 32 files changed, 1693 insertions(+), 1353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/lucene/search/QueryConverterImpl.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/lucene/search/QueryConverterImpl.java b/src/blur-core/src/main/java/org/apache/blur/lucene/search/QueryConverterImpl.java
index af5d883..cd931be 100644
--- a/src/blur-core/src/main/java/org/apache/blur/lucene/search/QueryConverterImpl.java
+++ b/src/blur-core/src/main/java/org/apache/blur/lucene/search/QueryConverterImpl.java
@@ -4,8 +4,8 @@ import java.io.IOException;
 
 import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.lucene.LuceneVersionConstant;
-import org.apache.blur.thrift.Configured;
-import org.apache.blur.thrift.TableContext;
+import org.apache.blur.server.Configured;
+import org.apache.blur.server.TableContext;
 import org.apache.lucene.queryparser.classic.ParseException;
 import org.apache.lucene.queryparser.classic.QueryParser;
 import org.apache.lucene.search.Query;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java b/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
index 1268c08..231a78d 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
@@ -23,7 +23,7 @@ import java.util.SortedSet;
 
 import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.manager.writer.BlurIndex;
-import org.apache.blur.thrift.TableContext;
+import org.apache.blur.server.TableContext;
 
 
 public interface IndexServer {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 4f2b25d..525f81a 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -52,11 +52,12 @@ import org.apache.blur.manager.writer.BlurIndexReader;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
 import org.apache.blur.manager.writer.BlurNRTIndex;
 import org.apache.blur.metrics.BlurMetrics;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.server.ZooKeeperTypeChecker;
 import org.apache.blur.store.blockcache.BlockDirectory;
 import org.apache.blur.store.blockcache.Cache;
 import org.apache.blur.store.hdfs.BlurLockFactory;
-import org.apache.blur.thrift.ShardContext;
-import org.apache.blur.thrift.TableContext;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
@@ -367,7 +368,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   @Override
   public TableContext getTableContext(String table) throws IOException {
     TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(false, table);
-    return TableContext.create(tableDescriptor);
+    return TableContext.create(tableDescriptor, new ZooKeeperTypeChecker(_zookeeper, ZookeeperPathConstants.getTablePath(_cluster, table), table));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
index 41bb945..d74f51d 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
@@ -24,8 +24,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.blur.thrift.ShardContext;
-import org.apache.blur.thrift.TableContext;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
 import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
index d437c48..538a1b7 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
-import org.apache.blur.thrift.TableContext;
+import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.Document;
 import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.Term;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index 6b49938..b00a688 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -29,8 +29,8 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.thrift.ShardContext;
-import org.apache.blur.thrift.TableContext;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.Document;
 import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.Term;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
index 978add3..1f3ee22 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
@@ -32,8 +32,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
-import org.apache.blur.thrift.ShardContext;
-import org.apache.blur.thrift.TableContext;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.Document;
 import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.Term;
@@ -142,14 +142,14 @@ public class TransactionRecorder {
       switch (lookup) {
       case ADD_DOCUMENTS:
         List<org.apache.blur.thrift.generated.Document> documents = readAddDocumentsFromWal(dataInputStream);
-        writer.addDocuments(toLucene(documents));
+        writer.addDocuments(toLucene(documents, tableContext.getTypeChecker()));
         addDocumentCount += documents.size();
         break;
       case UPDATE_DOCUMENTS:
         List<org.apache.blur.thrift.generated.UpdatePackage> updatePackages = readUpdatePackagesFromWal(dataInputStream);
         for (UpdatePackage updatePackage : updatePackages) {
           List<org.apache.blur.thrift.generated.Document> docs = updatePackage.getDocuments();
-          writer.updateDocuments(toLucene(updatePackage.getTerm()), toLucene(docs));
+          writer.updateDocuments(toLucene(updatePackage.getTerm()), toLucene(docs, tableContext.getTypeChecker()));
           updateDocumentCount += docs.size();
         }
         break;
@@ -295,7 +295,7 @@ public class TransactionRecorder {
     if (wal) {
       writeAddDocumentsToWal(documents);
     }
-    return writer.addDocuments(toLucene(documents));
+    return writer.addDocuments(toLucene(documents, tableContext.getTypeChecker()));
   }
 
   public long deleteDocuments(boolean wal, org.apache.blur.thrift.generated.Term[] deleteTerms, TrackingIndexWriter writer) throws IOException {
@@ -322,7 +322,7 @@ public class TransactionRecorder {
     }
     long generation = -1;
     for (UpdatePackage updatePackage : updatePackages) {
-      generation = writer.updateDocuments(toLucene(updatePackage.getTerm()), toLucene(updatePackage.getDocuments()));
+      generation = writer.updateDocuments(toLucene(updatePackage.getTerm()), toLucene(updatePackage.getDocuments(), tableContext.getTypeChecker()));
     }
     return generation;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/server/BlurServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/BlurServer.java b/src/blur-core/src/main/java/org/apache/blur/server/BlurServer.java
new file mode 100644
index 0000000..2c0fd59
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/BlurServer.java
@@ -0,0 +1,607 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_TIMETOLIVE;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DATA_FETCH_THREAD_COUNT;
+import static org.apache.blur.utils.ThriftLuceneConversion.toThrift;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.QueryConverter;
+import org.apache.blur.manager.IndexServer;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.server.BlurServer.SearchAction.ACTION;
+import org.apache.blur.server.TableLayout.TYPE;
+import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.BlurClientManager;
+import org.apache.blur.thrift.Connection;
+import org.apache.blur.thrift.commands.BlurCommand;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Document;
+import org.apache.blur.thrift.generated.Generation;
+import org.apache.blur.thrift.generated.MutateOptions;
+import org.apache.blur.thrift.generated.Query;
+import org.apache.blur.thrift.generated.QueryArgs;
+import org.apache.blur.thrift.generated.QueryStatus;
+import org.apache.blur.thrift.generated.Session;
+import org.apache.blur.thrift.generated.ShardLayout;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableSchema;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.thrift.generated.Term;
+import org.apache.blur.thrift.generated.TopFieldDocs;
+import org.apache.blur.thrift.generated.UpdatePackage;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.BlurValidations;
+import org.apache.blur.utils.ThriftLuceneConversion;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.thrift.TException;
+
+public class BlurServer extends TableAdmin implements Iface {
+
+  private static final Log LOG = LogFactory.getLog(BlurServer.class);
+  private IndexServer _indexServer;
+  private boolean _closed;
+  private long _maxTimeToLive = TimeUnit.MINUTES.toMillis(1);
+  private int _maxQueryCacheElements = 128;
+  private ExecutorService _dataFetch;
+  private ExecutorService _indexSearcherExecutor;
+  private ExecutorService _searchExecutor;
+  private int _dataFetchThreadCount = 32;
+  private TableLayout _layout;
+
+  public void init() throws BlurException {
+    _dataFetch = Executors.newThreadPool("data-fetch-", _dataFetchThreadCount);
+    _indexSearcherExecutor = Executors.newThreadPool("index-searcher-", 16);
+    _searchExecutor = Executors.newThreadPool("search-", 16);
+
+    if (_configuration == null) {
+      throw new BException("Configuration must be set before initialization.");
+    }
+    _dataFetchThreadCount = _configuration.getInt(BLUR_SHARD_DATA_FETCH_THREAD_COUNT, 8);
+    _maxQueryCacheElements = _configuration.getInt(BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS, 128);
+    _maxTimeToLive = _configuration.getLong(BLUR_SHARD_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1));
+  }
+
+  @Override
+  public TableStats tableStats(String table) throws BlurException, TException {
+    checkTable(table);
+    try {
+      TableStats tableStats = new TableStats();
+      // tableStats.tableName = table;
+      // tableStats.recordCount = _indexServer.getRecordCount(table);
+      // tableStats.rowCount = _indexServer.getRowCount(table);
+      tableStats.bytes = _indexServer.getTableSize(table);
+      tableStats.queries = 0;
+      return tableStats;
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get table stats [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  public synchronized void close() {
+    if (!_closed) {
+      _closed = true;
+      _dataFetch.shutdownNow();
+    }
+  }
+
+  public long getMaxTimeToLive() {
+    return _maxTimeToLive;
+  }
+
+  public void setMaxTimeToLive(long maxTimeToLive) {
+    _maxTimeToLive = maxTimeToLive;
+  }
+
+  public int getMaxQueryCacheElements() {
+    return _maxQueryCacheElements;
+  }
+
+  public void setMaxQueryCacheElements(int maxQueryCacheElements) {
+    _maxQueryCacheElements = maxQueryCacheElements;
+  }
+
+  public void setIndexServer(IndexServer indexServer) {
+    _indexServer = indexServer;
+  }
+
+  public int getDataFetchThreadCount() {
+    return _dataFetchThreadCount;
+  }
+
+  public void setDataFetchThreadCount(int dataFetchThreadCount) {
+    _dataFetchThreadCount = dataFetchThreadCount;
+  }
+
+  public void setConfiguration(BlurConfiguration conf) {
+    _configuration = conf;
+  }
+
+  // New interface from this point
+
+  private Map<String, SessionInfo> sessions = new ConcurrentHashMap<String, SessionInfo>();
+
+  @Override
+  public Session openReadSession(String table) throws BlurException, TException {
+    String uuid = UUID.randomUUID().toString();
+    return newSession(table, uuid);
+  }
+
+  private Session newSession(String table, String uuid) throws BlurException {
+    checkTable(table);
+    BlurAnalyzer analyzer = _indexServer.getAnalyzer(table);
+    Map<String, BlurIndex> blurIndexes;
+    try {
+      blurIndexes = _indexServer.getIndexes(table);
+    } catch (IOException e) {
+      LOG.error("Unknown error while trying to fetch index readers.", e);
+      throw new BException(e.getMessage(), e);
+    }
+    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, table);
+    SessionInfo sessionInfo = new SessionInfo();
+    sessionInfo.setUuid(uuid);
+    sessionInfo.setAnalyzer(analyzer);
+    sessionInfo.setTableDescriptor(tableDescriptor);
+
+    for (Entry<String, BlurIndex> entry : blurIndexes.entrySet()) {
+      int index = BlurUtil.getShardIndex(entry.getKey());
+      try {
+        IndexReader indexReader = entry.getValue().getIndexReader();
+        // @TODO use new thread pool here
+        IndexSearcher indexSearcher = new IndexSearcher(indexReader, _searchExecutor);
+        sessionInfo.add(index, indexReader);
+        sessionInfo.add(index, indexSearcher);
+      } catch (IOException e) {
+        LOG.error("Unknown error while trying to fetch index readers.", e);
+      }
+    }
+    sessions.put(uuid, sessionInfo);
+    return new Session(uuid, table);
+  }
+
+  @Override
+  public List<TopFieldDocs> search(final Session session, final QueryArgs queryArgs) throws BlurException, TException {
+    SessionInfo info = getSessionInfo(session);
+    try {
+      Map<Integer, IndexSearcher> searchers = info.getSearchers();
+      List<Integer> shardIndexes = queryArgs.getShardIndexes();
+      TableDescriptor tableDescriptor = info.getTableDescriptor();
+      Collection<SearchAction> searchersToSearch = getSearchActions(tableDescriptor, shardIndexes, searchers);
+
+      List<Future<TopFieldDocs>> futures = new ArrayList<Future<TopFieldDocs>>(searchersToSearch.size());
+      
+      TableContext context = _indexServer.getTableContext(session.getTableName());
+      QueryConverter queryConverter = context.getQueryConverter();
+
+      org.apache.lucene.search.Query query = queryConverter.convert(queryArgs.getQuery());
+      org.apache.lucene.search.Filter filter = ThriftLuceneConversion.toLuceneFilter(queryArgs);
+      org.apache.lucene.search.Sort sort = ThriftLuceneConversion.toLuceneSort(queryArgs);
+      org.apache.lucene.search.ScoreDoc after = ThriftLuceneConversion.toLucene(queryArgs.getAfter());
+      boolean doDocScores = queryArgs.isDoDocScores();
+      boolean doMaxScore = queryArgs.isDoMaxScore();
+      int numberToFetch = queryArgs.getNumberToFetch();
+      for (SearchAction action : searchersToSearch) {
+        final int shardIndex = action.shardIndex;
+        if (action.type == ACTION.LOCAL) {
+          SearchCallable searchCallable = new SearchCallable(shardIndex, action.indexSearcher, after, query, filter, sort, numberToFetch, doDocScores, doMaxScore);
+          Future<TopFieldDocs> future = _indexSearcherExecutor.submit(searchCallable);
+          futures.add(future);
+        } else if (action.type == ACTION.REMOTE) {
+          // @TODO need to send only one call per server, instead of one for
+          // each shard in the table
+          final Connection connection = action.remoteServer;
+          Future<TopFieldDocs> future = _indexSearcherExecutor.submit(new Callable<TopFieldDocs>() {
+            @Override
+            public TopFieldDocs call() throws Exception {
+              List<TopFieldDocs> list = BlurClientManager.execute(connection, new BlurCommand<List<TopFieldDocs>>() {
+                @Override
+                public List<TopFieldDocs> call(Client client) throws BlurException, TException {
+                  QueryArgs remoteQueryArgs = new QueryArgs(queryArgs);
+                  remoteQueryArgs.addToShardIndexes(shardIndex);
+                  return client.search(session, remoteQueryArgs);
+                }
+              });
+              return list.iterator().next();
+            }
+          });
+          futures.add(future);
+        }
+      }
+
+      List<TopFieldDocs> result = new ArrayList<TopFieldDocs>(futures.size());
+      for (Future<TopFieldDocs> future : futures) {
+        result.add(future.get());
+      }
+      return result;
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  static class SearchAction {
+    enum ACTION {
+      LOCAL, REMOTE
+    }
+
+    ACTION type;
+
+    SearchAction(int shardIndex, IndexSearcher indexSearcher) {
+      this.type = ACTION.LOCAL;
+      this.shardIndex = shardIndex;
+      this.indexSearcher = indexSearcher;
+    }
+
+    SearchAction(int shardIndex, Connection remoteServer) {
+      this.type = ACTION.REMOTE;
+      this.shardIndex = shardIndex;
+      this.remoteServer = remoteServer;
+    }
+
+    int shardIndex;
+    IndexSearcher indexSearcher;
+    Connection remoteServer;
+  }
+
+  private Collection<SearchAction> getSearchActions(TableDescriptor tableDescriptor, List<Integer> shardIndexes, Map<Integer, IndexSearcher> searchers) throws BlurException {
+    String name = tableDescriptor.getName();
+    int shardCount = tableDescriptor.getShardCount();
+    Collection<SearchAction> searchersToSearch = new ArrayList<SearchAction>();
+    if (shardIndexes == null) {
+      shardIndexes = new ArrayList<Integer>(shardCount);
+      // all indexes
+      for (int i = 0; i < shardCount; i++) {
+        shardIndexes.add(i);
+      }
+    }
+    for (Integer index : shardIndexes) {
+      IndexSearcher searcher = searchers.get(index);
+      if (searcher != null) {
+        searchersToSearch.add(new SearchAction(index, searcher));
+      } else {
+        searchersToSearch.add(new SearchAction(index, getConnection(name, index)));
+      }
+    }
+    return searchersToSearch;
+  }
+
+  private SessionInfo getSessionInfo(Session session) throws BlurException {
+    SessionInfo info = sessions.get(session.getSessionId());
+    if (info == null) {
+      newSession(session.getTableName(), session.getSessionId());
+      return getSessionInfo(session);
+    }
+    return info;
+  }
+
+  @Override
+  public List<Document> doc(Session session, List<Long> docLocations, Set<String> fieldsToLoad) throws BlurException, TException {
+    try {
+      SessionInfo sessionInfo = getSessionInfo(session);
+      Map<Integer, IndexSearcher> searchers = sessionInfo.getSearchers();
+      List<Document> result = new ArrayList<Document>();
+      for (Long docLocation : docLocations) {
+        if (docLocation == null) {
+          throw new BlurException("Null docLocation is not allowed.", null);
+        }
+        int shardIndex = BlurUtil.getShardIndex(docLocation);
+        int docId = BlurUtil.getDocumentId(docLocation);
+        IndexSearcher searcher = searchers.get(shardIndex);
+        if (searcher == null) {
+          result.addAll(forwardDoc(session, shardIndex, docLocation, fieldsToLoad));
+        } else {
+          org.apache.lucene.document.Document document = searcher.document(docId, fieldsToLoad);
+          result.add(toThrift(document));
+        }
+      }
+      return result;
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  private List<Document> forwardDoc(final Session session, int shardIndex, final Long docLocation, final Set<String> fieldsToLoad) throws BlurException, TException, IOException {
+    // TODO Make more efficient by making a single call to a server for many
+    // docLocations
+    String table = session.getTableName();
+    Connection connection = getConnection(table, shardIndex);
+    return BlurClientManager.execute(connection, new BlurCommand<List<Document>>() {
+      @Override
+      public List<Document> call(Client client) throws BlurException, TException {
+        return client.doc(session, Arrays.asList(docLocation), fieldsToLoad);
+      }
+    });
+  }
+
+  @Override
+  public void closeReadSession(Session session) throws BlurException, TException {
+    SessionInfo sessionInfo = getSessionInfo(session);
+    sessionInfo.releaseReaders();
+  }
+
+  @Override
+  public List<Generation> addDocuments(MutateOptions options, List<Document> documents) throws BlurException, TException {
+    String table = options.getTable();
+    int shardIndex = getShardIndex(options);
+    boolean waitToBeVisible = options.isWaitToBeVisible();
+    boolean writeAheadLog = options.isWriteAheadLog();
+    List<Generation> generations = new ArrayList<Generation>();
+    try {
+      BlurIndex index = getIndex(table, shardIndex);
+      List<Document> indexableDocuments = BlurValidations.getAllIndexableDocuments(documents);
+      if (index == null) {
+        generations.addAll(forwardAddDocuments(options, indexableDocuments));
+      } else {
+        long generation = index.addDocuments(waitToBeVisible, writeAheadLog, indexableDocuments);
+        generations.add(new Generation(table, shardIndex, generation));
+      }
+      return generations;
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+  
+  private int getShardIndex(MutateOptions options) {
+    int shardIndex = options.getShardIndex();
+    if (shardIndex < 0) {
+      // @TODO this is going to be very slow
+      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, options.getTable());
+      int shardCount = tableDescriptor.getShardCount();
+      Random random = new Random();
+      return random.nextInt(shardCount);
+    }
+    return shardIndex;
+  }
+
+  @Override
+  public void blockUntilGenerationIsVisible(List<Generation> generations, boolean forceRefresh) throws BlurException, TException {
+    try {
+      for (Generation generation : generations) {
+        String table = generation.getTable();
+        int shardIndex = generation.getShardIndex();
+        BlurIndex index = getIndex(table, shardIndex);
+        if (index == null) {
+          forwardBlockUntilGenerationIsVisible(generation, forceRefresh);
+        } else {
+          index.blockUntilGenerationIsVisible(generation.getGeneration(), forceRefresh);
+        }
+      }
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  private void forwardBlockUntilGenerationIsVisible(final Generation generation, final boolean forceRefresh) throws BlurException, TException, IOException {
+    String table = generation.getTable();
+    int shardIndex = generation.getShardIndex();
+    Connection connection = getConnection(table, shardIndex);
+    BlurClientManager.execute(connection, new BlurCommand<Void>() {
+      @Override
+      public Void call(Client client) throws BlurException, TException {
+        client.blockUntilGenerationIsVisible(Arrays.asList(generation), forceRefresh);
+        return null;
+      }
+    });
+  }
+
+  @Override
+  public List<Generation> deleteDocumentsByQueries(MutateOptions options, List<org.apache.blur.thrift.generated.Query> queries) throws BlurException, TException {
+    String table = options.getTable();
+    int shardIndex = getShardIndex(options);
+    boolean waitToBeVisible = options.isWaitToBeVisible();
+    boolean writeAheadLog = options.isWriteAheadLog();
+    List<Generation> generations = new ArrayList<Generation>();
+    try {
+      BlurIndex index = getIndex(table, shardIndex);
+      if (index == null) {
+        generations.addAll(forwardDeleteDocumentsByQueries(options, queries));
+      } else {
+        long generation = index.deleteDocuments(waitToBeVisible, writeAheadLog, queries.toArray(new Query[queries.size()]));
+        generations.add(new Generation(table, shardIndex, generation));
+      }
+      return generations;
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+  
+  @Override
+  public List<Generation> deleteDocuments(MutateOptions options, List<Term> terms) throws BlurException, TException {
+    String table = options.getTable();
+    int shardIndex = getShardIndex(options);
+    boolean waitToBeVisible = options.isWaitToBeVisible();
+    boolean writeAheadLog = options.isWriteAheadLog();
+    List<Generation> generations = new ArrayList<Generation>();
+    try {
+      BlurIndex index = getIndex(table, shardIndex);
+      if (index == null) {
+        generations.addAll(forwardDeleteDocuments(options, terms));
+      } else {
+        long generation = index.deleteDocuments(waitToBeVisible, writeAheadLog, terms.toArray(new Term[terms.size()]));
+        generations.add(new Generation(table, shardIndex, generation));
+      }
+      return generations;
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  @Override
+  public List<Generation> updateDocuments(MutateOptions options, List<UpdatePackage> updatePackages) throws BlurException, TException {
+    String table = options.getTable();
+    int shardIndex = getShardIndex(options);
+    boolean waitToBeVisible = options.isWaitToBeVisible();
+    boolean writeAheadLog = options.isWriteAheadLog();
+    List<Generation> generations = new ArrayList<Generation>();
+    try {
+      BlurIndex index = getIndex(table, shardIndex);
+      List<UpdatePackage> updatePackagesWithIndexableDocs = BlurValidations.getAllIndexablePackages(updatePackages); 
+      if (index == null) {
+        generations.addAll(forwardUpdateDocuments(options, updatePackagesWithIndexableDocs));
+      } else {
+        long generation = index.updateDocuments(waitToBeVisible, writeAheadLog, updatePackagesWithIndexableDocs);
+        generations.add(new Generation(table, shardIndex, generation));
+      }
+      return generations;
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  private BlurIndex getIndex(String table, int shardIndex) throws BException {
+    Map<String, BlurIndex> blurIndexes;
+    try {
+      blurIndexes = _indexServer.getIndexes(table);
+    } catch (IOException e) {
+      LOG.error("Unknown error while trying to fetch index readers.", e);
+      throw new BException(e.getMessage(), e);
+    }
+    BlurIndex index = blurIndexes.get(BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardIndex));
+    return index;
+  }
+
+  @Override
+  public List<Integer> serverLayout(String table, String server) throws BlurException, TException {
+    throw new BlurException("Not implemented", null);
+  }
+
+  @Override
+  public void cancelQuery(Session session, long id) throws BlurException, TException {
+    throw new BlurException("Not implemented", null);
+  }
+
+  @Override
+  public List<Long> queryStatusIdList(Session session) throws BlurException, TException {
+    throw new BlurException("Not implemented", null);
+  }
+
+  @Override
+  public QueryStatus queryStatus(Session session, long id) throws BlurException, TException {
+    throw new BlurException("Not implemented", null);
+  }
+
+  @Override
+  public TableSchema schema(Session session, List<Integer> shardIds) throws BlurException, TException {
+    SessionInfo info = getSessionInfo(session);
+    
+    
+    
+    
+    throw new BlurException("Not implemented", null);
+  }
+
+  public TableLayout getLayout() {
+    return _layout;
+  }
+
+  public void setLayout(TableLayout layout) {
+    this._layout = layout;
+  }
+
+  private List<Generation> forwardAddDocuments(final MutateOptions options, final List<Document> documents) throws BlurException, TException, IOException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    Connection connection = getConnection(table, shardIndex);
+    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
+      @Override
+      public List<Generation> call(Client client) throws BlurException, TException {
+        return client.addDocuments(options, documents);
+      }
+    });
+  }
+
+  private List<Generation> forwardUpdateDocuments(final MutateOptions options, final List<UpdatePackage> updatePackages) throws BlurException, TException, IOException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    Connection connection = getConnection(table, shardIndex);
+    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
+      @Override
+      public List<Generation> call(Client client) throws BlurException, TException {
+        return client.updateDocuments(options, updatePackages);
+      }
+    });
+  }
+
+  private List<Generation> forwardDeleteDocumentsByQueries(final MutateOptions options, final List<org.apache.blur.thrift.generated.Query> queries) throws BlurException,
+      TException, IOException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    Connection connection = getConnection(table, shardIndex);
+    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
+      @Override
+      public List<Generation> call(Client client) throws BlurException, TException {
+        return client.deleteDocumentsByQueries(options, queries);
+      }
+    });
+  }
+
+  private List<Generation> forwardDeleteDocuments(final MutateOptions options, final List<Term> terms) throws BlurException, TException, IOException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    Connection connection = getConnection(table, shardIndex);
+    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
+      @Override
+      public List<Generation> call(Client client) throws BlurException, TException {
+        return client.deleteDocuments(options, terms);
+      }
+    });
+  }
+
+  private Connection getConnection(String table, int shardIndex) {
+    String server = _layout.findServer(table, shardIndex, TYPE.WRITABLE);
+    return new Connection(server);
+  }
+
+  @Override
+  public Map<String, ShardLayout> shardLayout(String table) throws BlurException, TException {
+    throw new BlurException("Not implemented", null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/server/Configurable.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/Configurable.java b/src/blur-core/src/main/java/org/apache/blur/server/Configurable.java
new file mode 100644
index 0000000..368198f
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/Configurable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.server;
+
+public interface Configurable {
+
+  public void setTableContext(TableContext context);
+
+  public TableContext getTableContext();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/server/Configured.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/Configured.java b/src/blur-core/src/main/java/org/apache/blur/server/Configured.java
new file mode 100644
index 0000000..b477de0
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/Configured.java
@@ -0,0 +1,26 @@
+package org.apache.blur.server;
+
+
+public abstract class Configured implements Configurable {
+
+  private TableContext context;
+
+  public Configured() {
+    this(null);
+  }
+
+  public Configured(TableContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public TableContext getTableContext() {
+    return context;
+  }
+
+  @Override
+  public void setTableContext(TableContext context) {
+    this.context = context;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/server/SearchCallable.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/SearchCallable.java b/src/blur-core/src/main/java/org/apache/blur/server/SearchCallable.java
new file mode 100644
index 0000000..81b9158
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/SearchCallable.java
@@ -0,0 +1,66 @@
+package org.apache.blur.server;
+
+import static org.apache.blur.utils.ThriftLuceneConversion.setShardIndexTopDocs;
+import static org.apache.blur.utils.ThriftLuceneConversion.setShardIndexTopFieldDocs;
+import static org.apache.blur.utils.ThriftLuceneConversion.toThrift;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import org.apache.blur.thrift.generated.TopFieldDocs;
+import org.apache.lucene.search.Filter;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Sort;
+
+public class SearchCallable implements Callable<TopFieldDocs> {
+  private final ScoreDoc after;
+  private final Sort sort;
+  private final Filter filter;
+  private final Query query;
+  private final IndexSearcher searcher;
+  private final int count;
+  private final boolean doDocScores;
+  private final boolean doMaxScore;
+  private final int shardIndex;
+
+  SearchCallable(int shardIndex, IndexSearcher searcher, ScoreDoc after, Query query, Filter filter, Sort sort, int count, boolean doDocScores, boolean doMaxScore) {
+    this.after = after;
+    this.searcher = searcher;
+    this.query = query;
+    this.filter = filter;
+    this.sort = sort;
+    this.count = count;
+    this.doDocScores = doDocScores;
+    this.doMaxScore = doMaxScore;
+    this.shardIndex = shardIndex;
+  }
+
+  @Override
+  public TopFieldDocs call() throws Exception {
+    return addShardIndex(doSearch());
+  }
+
+  private TopFieldDocs addShardIndex(TopFieldDocs topFieldDocs) {
+    topFieldDocs.setShardIndex(shardIndex);
+    return topFieldDocs;
+  }
+
+  private TopFieldDocs doSearch() throws IOException {
+    if (after == null) {
+      if (sort == null) {
+        return toThrift(setShardIndexTopDocs(shardIndex, searcher.search(query, filter, count)));
+      } else {
+        return toThrift(setShardIndexTopFieldDocs(shardIndex, searcher.search(query, filter, count, sort, doDocScores, doMaxScore)));
+      }
+    } else {
+      if (sort == null) {
+        return toThrift(setShardIndexTopDocs(shardIndex, searcher.searchAfter(after, query, filter, count)));
+      } else {
+        return toThrift(setShardIndexTopFieldDocs(shardIndex,
+            (org.apache.lucene.search.TopFieldDocs) searcher.searchAfter(after, query, filter, count, sort, doDocScores, doMaxScore)));
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/server/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/SessionInfo.java b/src/blur-core/src/main/java/org/apache/blur/server/SessionInfo.java
new file mode 100644
index 0000000..3fcc9e9
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/SessionInfo.java
@@ -0,0 +1,100 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+
+public class SessionInfo {
+
+  private static final Log LOG = LogFactory.getLog(SessionInfo.class);
+
+  private String uuid;
+  private BlurAnalyzer analyzer;
+  private Map<Integer, IndexReader> readers = new HashMap<Integer, IndexReader>();
+  private Map<Integer, IndexSearcher> searchers = new HashMap<Integer, IndexSearcher>();
+  private TableDescriptor tableDescriptor;
+
+  public BlurAnalyzer getAnalyzer() {
+    return analyzer;
+  }
+
+  public void setAnalyzer(BlurAnalyzer analyzer) {
+    this.analyzer = analyzer;
+  }
+
+  public String getUuid() {
+    return uuid;
+  }
+
+  public void setUuid(String uuid) {
+    this.uuid = uuid;
+  }
+
+  public void add(int index, IndexReader indexReader) {
+    readers.put(index, indexReader);
+  }
+
+  public void add(int index, IndexSearcher indexSearcher) {
+    searchers.put(index, indexSearcher);
+  }
+
+  public Map<Integer, IndexReader> getReaders() {
+    return readers;
+  }
+
+  public void setReaders(Map<Integer, IndexReader> readers) {
+    this.readers = readers;
+  }
+
+  public Map<Integer, IndexSearcher> getSearchers() {
+    return searchers;
+  }
+
+  public void setSearchers(Map<Integer, IndexSearcher> searchers) {
+    this.searchers = searchers;
+  }
+
+  public void releaseReaders() {
+    for (Entry<Integer, IndexReader> entry : readers.entrySet()) {
+      IndexReader reader = entry.getValue();
+      try {
+        reader.decRef();
+      } catch (IOException e) {
+        LOG.error("Unknown exception while trying to decRef on reader [{0}]", e, reader);
+      }
+    }
+  }
+
+  public TableDescriptor getTableDescriptor() {
+    return tableDescriptor;
+  }
+
+  public void setTableDescriptor(TableDescriptor tableDescriptor) {
+    this.tableDescriptor = tableDescriptor;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/server/ShardContext.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/ShardContext.java b/src/blur-core/src/main/java/org/apache/blur/server/ShardContext.java
new file mode 100644
index 0000000..7b7d864
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/ShardContext.java
@@ -0,0 +1,71 @@
+package org.apache.blur.server;
+
+import java.io.IOException;
+
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.Directory;
+
+public class ShardContext {
+
+  private String shard;
+  private Path walShardPath;
+  private Path hdfsDirPath;
+  private Directory directory;
+  private TableContext tableContext;
+
+  public TableContext getTableContext() {
+    return tableContext;
+  }
+
+  public void setTableContext(TableContext tableContext) {
+    this.tableContext = tableContext;
+  }
+
+  protected ShardContext() {
+
+  }
+
+  public Directory getDirectory() {
+    return directory;
+  }
+
+  public void setDirectory(Directory directory) {
+    this.directory = directory;
+  }
+
+  public Path getHdfsDirPath() {
+    return hdfsDirPath;
+  }
+
+  public void setHdfsDirPath(Path hdfsDirPath) {
+    this.hdfsDirPath = hdfsDirPath;
+  }
+
+  public String getShard() {
+    return shard;
+  }
+
+  public void setShard(String shard) {
+    this.shard = shard;
+  }
+
+  public Path getWalShardPath() {
+    return walShardPath;
+  }
+
+  public void setWalShardPath(Path walShardPath) {
+    this.walShardPath = walShardPath;
+  }
+
+  public static ShardContext create(TableContext tableContext, String shard) throws IOException {
+    ShardContext shardContext = new ShardContext();
+    shardContext.tableContext = tableContext;
+    shardContext.walShardPath = new Path(tableContext.getWalTablePath(), shard);
+    shardContext.hdfsDirPath = new Path(tableContext.getTablePath(), shard);
+    shardContext.shard = shard;
+    shardContext.directory = new HdfsDirectory(tableContext.getConfiguration(), shardContext.hdfsDirPath);
+    return shardContext;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/server/TableAdmin.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/TableAdmin.java b/src/blur-core/src/main/java/org/apache/blur/server/TableAdmin.java
new file mode 100644
index 0000000..813a9f6
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/TableAdmin.java
@@ -0,0 +1,240 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.List;
+import java.util.Map;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.clusterstatus.ClusterStatus;
+import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public abstract class TableAdmin implements Iface {
+
+  private static final Log LOG = LogFactory.getLog(TableAdmin.class);
+  protected ZooKeeper _zookeeper;
+  protected ClusterStatus _clusterStatus;
+  protected BlurConfiguration _configuration;
+  private String cluster;
+
+  @Override
+  public boolean isInSafeMode() throws BlurException, TException {
+    try {
+      return _clusterStatus.isInSafeMode(true);
+    } catch (Exception e) {
+      LOG.error("Unknown error during safe mode check of [cluster={0}]", e, cluster);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public final void createTable(TableDescriptor tableDescriptor) throws BlurException, TException {
+    try {
+      _clusterStatus.createTable(tableDescriptor);
+    } catch (Exception e) {
+      LOG.error("Unknown error during create of [table={0}, tableDescriptor={1}]", e, tableDescriptor.name, tableDescriptor);
+      throw new BException(e.getMessage(), e);
+    }
+    if (tableDescriptor.isEnabled()) {
+      enableTable(tableDescriptor.getName());
+    }
+  }
+
+  @Override
+  public final void disableTable(String table) throws BlurException, TException {
+    try {
+      _clusterStatus.disableTable(table);
+      waitForTheTableToDisable(table);
+      waitForTheTableToDisengage(table);
+    } catch (Exception e) {
+      LOG.error("Unknown error during disable of [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  private void waitForTheTableToDisengage(String table) throws BlurException, TException {
+    // LOG.info("Waiting for shards to disengage on table [" + table + "]");
+  }
+
+  private void waitForTheTableToDisable(String table) throws BlurException, TException {
+    LOG.info("Waiting for shards to disable on table [" + table + "]");
+    while (true) {
+      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(false, table);
+      if (!tableDescriptor.isEnabled()) {
+        return;
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        LOG.error("Unknown error while enabling table [" + table + "]", e);
+        throw new BException("Unknown error while enabling table [" + table + "]", e);
+      }
+    }
+  }
+
+  @Override
+  public final void enableTable(String table) throws BlurException, TException {
+    try {
+      _clusterStatus.enableTable(table);
+      waitForTheTableToEnable(table);
+      waitForTheTableToEngage(table);
+    } catch (Exception e) {
+      LOG.error("Unknown error during enable of [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  private void waitForTheTableToEnable(String table) throws BlurException {
+    LOG.info("Waiting for shards to engage on table [" + table + "]");
+    while (true) {
+      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(false, table);
+      if (tableDescriptor.isEnabled()) {
+        return;
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        LOG.error("Unknown error while enabling table [" + table + "]", e);
+        throw new BException("Unknown error while enabling table [" + table + "]", e);
+      }
+    }
+  }
+
+  private void waitForTheTableToEngage(String table) throws BlurException, TException {
+    LOG.info("IMPLEMENT - Waiting for shards to engage on table [" + table + "]");
+    
+//    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(false, table);
+//    int shardCount = tableDescriptor.shardCount;
+//    LOG.info("Waiting for shards to engage on table [" + table + "]");
+//    while (true) {
+//      try {
+//        Thread.sleep(3000);
+//      } catch (InterruptedException e) {
+//        LOG.error("Unknown error while engaging table [" + table + "]", e);
+//        throw new BException("Unknown error while engaging table [" + table + "]", e);
+//      }
+//      try {
+//        Map<String, String> shardServerLayout = getLayout();//shardServerLayout(table);
+//        LOG.info("Shards [" + shardServerLayout.size() + "/" + shardCount + "] of table [" + table + "] engaged");
+//        if (shardServerLayout.size() == shardCount) {
+//          return;
+//        }
+//      } catch (BlurException e) {
+//        LOG.info("Stilling waiting", e);
+//      } catch (TException e) {
+//        LOG.info("Stilling waiting", e);
+//      }
+//    }
+  }
+
+  private Map<String, String> getLayout() throws BlurException, TException {
+    return null;
+  }
+
+  @Override
+  public final void removeTable(String table, boolean deleteIndexFiles) throws BlurException, TException {
+    try {
+      _clusterStatus.removeTable(table, deleteIndexFiles);
+    } catch (Exception e) {
+      LOG.error("Unknown error during remove of [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  public void checkTable(String table) throws BlurException {
+    if (table == null) {
+      throw new BlurException("Table cannot be null.", null);
+    }
+    boolean inSafeMode = _clusterStatus.isInSafeMode(true);
+    if (inSafeMode) {
+      throw new BlurException("Cluster for [" + table + "] is in safe mode", null);
+    }
+    boolean exists = _clusterStatus.exists(true, table);
+    if (exists) {
+      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, table);
+      if (tableDescriptor.isEnabled()) {
+        return;
+      }
+      throw new BlurException("Table [" + table + "] exists, but is not enabled", null);
+    } else {
+      throw new BlurException("Table [" + table + "] does not exist", null);
+    }
+  }
+
+
+  public void checkForUpdates(String table) throws BlurException {
+    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, table);
+    if (tableDescriptor.isReadOnly()) {
+      throw new BlurException("Table [" + table + "] in cluster [" + cluster + "] is read only.", null);
+    }
+  }
+
+
+  @Override
+  public final List<String> serverList() throws BlurException, TException {
+    try {
+      return _clusterStatus.getServerList(true);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get a shard server list.", e);
+      throw new BException("Unknown error while trying to get a shard server list.", e);
+    }
+  }
+
+  @Override
+  public final TableDescriptor describe(final String table) throws BlurException, TException {
+    try {
+      return _clusterStatus.getTableDescriptor(true, table);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to describe a table [" + table + "].", e);
+      throw new BException("Unknown error while trying to describe a table [" + table + "].", e);
+    }
+  }
+
+  @Override
+  public final List<String> tableList() throws BlurException, TException {
+    try {
+      return _clusterStatus.getTableList(true);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get a table list.", e);
+      throw new BException("Unknown error while trying to get a table list.", e);
+    }
+  }
+
+  public ClusterStatus getClusterStatus() {
+    return _clusterStatus;
+  }
+
+  public void setClusterStatus(ClusterStatus clusterStatus) {
+    _clusterStatus = clusterStatus;
+  }
+
+  public void setZookeeper(ZooKeeper zookeeper) {
+    _zookeeper = zookeeper;
+  }
+  
+  public void setConfiguration(BlurConfiguration config) {
+    _configuration = config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/src/blur-core/src/main/java/org/apache/blur/server/TableContext.java
new file mode 100644
index 0000000..f4a6081
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -0,0 +1,204 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.BLUR_LUCENE_INDEX_DELETION_POLICY_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_LUCENE_INDEX_SIMILARITY_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_QUERY_CONVERTER_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.QueryConverter;
+import org.apache.blur.lucene.search.QueryConverterImpl;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.search.similarities.DefaultSimilarity;
+import org.apache.lucene.search.similarities.Similarity;
+import org.apache.zookeeper.ZooKeeper;
+
+public class TableContext {
+
+  private static final Log LOG = LogFactory.getLog(TableContext.class);
+
+  private static final String LOGS = "logs";
+
+  private Path tablePath;
+  private Path walTablePath;
+  private BlurAnalyzer analyzer;
+  private String defaultFieldName;
+  private String table;
+  private IndexDeletionPolicy indexDeletionPolicy;
+  private Similarity similarity;
+  private QueryConverter queryConverter;
+  private Configuration configuration;
+  private TableDescriptor descriptor;
+  private long timeBetweenCommits;
+  private long timeBetweenRefreshs;
+  private TypeChecker typeChecker;
+
+  public TypeChecker getTypeChecker() {
+    return typeChecker;
+  }
+
+  protected TableContext() {
+
+  }
+
+  public static TableContext create(TableDescriptor tableDescriptor, TypeChecker typeChecker) {
+    LOG.info("Creating table context for table [{0}]", tableDescriptor.getName());
+    Configuration configuration = new Configuration();
+    Map<String, String> properties = tableDescriptor.getProperties();
+    if (properties != null) {
+      for (Entry<String, String> prop : properties.entrySet()) {
+        configuration.set(prop.getKey(), prop.getValue());
+      }
+    }
+
+    TableContext tableContext = new TableContext();
+    tableContext.configuration = configuration;
+    tableContext.tablePath = new Path(tableDescriptor.getStoragePath());
+    tableContext.walTablePath = new Path(tableContext.tablePath, LOGS);
+    tableContext.analyzer = new BlurAnalyzer(tableDescriptor.getAnalyzer());
+    tableContext.defaultFieldName = tableDescriptor.getDefaultFieldName();
+    tableContext.table = tableDescriptor.getName();
+    tableContext.descriptor = tableDescriptor;
+    tableContext.timeBetweenCommits = configuration.getLong(BLUR_SHARD_TIME_BETWEEN_COMMITS, 60000);
+    tableContext.timeBetweenRefreshs = configuration.getLong(BLUR_SHARD_TIME_BETWEEN_REFRESHS, 5000);
+    tableContext.typeChecker = typeChecker;
+
+    Class<?> c1 = configuration.getClass(BLUR_LUCENE_INDEX_DELETION_POLICY_CLASS, KeepOnlyLastCommitDeletionPolicy.class);
+    tableContext.indexDeletionPolicy = (IndexDeletionPolicy) configure(ReflectionUtils.newInstance(c1, configuration), tableContext);
+    Class<?> c2 = configuration.getClass(BLUR_LUCENE_INDEX_SIMILARITY_CLASS, DefaultSimilarity.class);
+    tableContext.similarity = (Similarity) configure(ReflectionUtils.newInstance(c2, configuration), tableContext);
+    Class<?> c3 = configuration.getClass(BLUR_QUERY_CONVERTER_CLASS, QueryConverterImpl.class);
+    tableContext.queryConverter = (QueryConverter) configure(ReflectionUtils.newInstance(c3, configuration), tableContext);
+    return tableContext;
+  }
+
+  private static Object configure(Object o, TableContext tableContext) {
+    if (o instanceof Configurable) {
+      ((Configurable) o).setTableContext(tableContext);
+    }
+    return o;
+  }
+
+  public IndexDeletionPolicy getIndexDeletionPolicy() {
+    return indexDeletionPolicy;
+  }
+
+  public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
+    this.indexDeletionPolicy = indexDeletionPolicy;
+  }
+
+  public Similarity getSimilarity() {
+    return similarity;
+  }
+
+  public void setSimilarity(Similarity similarity) {
+    this.similarity = similarity;
+  }
+
+  public long getTimeBetweenCommits() {
+    return timeBetweenCommits;
+  }
+
+  public void setTimeBetweenCommits(long timeBetweenCommits) {
+    this.timeBetweenCommits = timeBetweenCommits;
+  }
+
+  public long getTimeBetweenRefreshs() {
+    return timeBetweenRefreshs;
+  }
+
+  public void setTimeBetweenRefreshs(long timeBetweenRefreshs) {
+    this.timeBetweenRefreshs = timeBetweenRefreshs;
+  }
+
+  public BlurAnalyzer getAnalyzer() {
+    return analyzer;
+  }
+
+  public void setAnalyzer(BlurAnalyzer analyzer) {
+    this.analyzer = analyzer;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public QueryConverter getQueryConverter() {
+    return queryConverter;
+  }
+
+  public void setQueryConverter(QueryConverter queryConverter) {
+    this.queryConverter = queryConverter;
+  }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  public void setConfiguration(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  public TableDescriptor getDescriptor() {
+    return descriptor;
+  }
+
+  public void setDescriptor(TableDescriptor descriptor) {
+    this.descriptor = descriptor;
+  }
+
+  public Path getTablePath() {
+    return tablePath;
+  }
+
+  public void setTablePath(Path tablePath) {
+    this.tablePath = tablePath;
+  }
+
+  public Path getWalTablePath() {
+    return walTablePath;
+  }
+
+  public void setWalTablePath(Path walTablePath) {
+    this.walTablePath = walTablePath;
+  }
+
+  public String getDefaultFieldName() {
+    return defaultFieldName;
+  }
+
+  public void setDefaultFieldName(String defaultFieldName) {
+    this.defaultFieldName = defaultFieldName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/server/TableLayout.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/TableLayout.java b/src/blur-core/src/main/java/org/apache/blur/server/TableLayout.java
new file mode 100644
index 0000000..478976a
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/TableLayout.java
@@ -0,0 +1,11 @@
+package org.apache.blur.server;
+
+public abstract class TableLayout {
+
+  public enum TYPE {
+    WRITABLE, ONLINE, BACKUP
+  }
+  
+  public abstract String findServer(String table, int shard, TYPE type);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/server/TypeChecker.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/TypeChecker.java b/src/blur-core/src/main/java/org/apache/blur/server/TypeChecker.java
new file mode 100644
index 0000000..9a66c63
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/TypeChecker.java
@@ -0,0 +1,22 @@
+package org.apache.blur.server;
+
+import java.io.IOException;
+
+import org.apache.blur.thrift.generated.Field;
+
+public abstract class TypeChecker {
+
+  /**
+   * Validates that the type for the given field (by name) is correct. If the
+   * field (by name) is missing then the type is recorded and every check for
+   * that field thereafter will throw an exception if the type does not match
+   * the original type. In other words the first type for a given field wins.
+   * 
+   * @param field
+   *          the field to validate.
+   * @throws IOException
+   *           throws an IOException if the type is incorrect.
+   */
+  public abstract void validate(Field field) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/server/ZooKeeperTypeChecker.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/ZooKeeperTypeChecker.java b/src/blur-core/src/main/java/org/apache/blur/server/ZooKeeperTypeChecker.java
new file mode 100644
index 0000000..90002ea
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/ZooKeeperTypeChecker.java
@@ -0,0 +1,40 @@
+package org.apache.blur.server;
+
+import java.io.IOException;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.Field;
+import org.apache.blur.zookeeper.ZkCachedMap;
+import org.apache.zookeeper.ZooKeeper;
+
+public class ZooKeeperTypeChecker extends TypeChecker {
+
+  private final static Log LOG = LogFactory.getLog(ZooKeeperTypeChecker.class);
+
+  private ZkCachedMap cachedMap;
+  private String _table;
+
+  public ZooKeeperTypeChecker(ZooKeeper zooKeeper, String tablePath, String tableName) {
+    cachedMap = new ZkCachedMap(zooKeeper, tablePath);
+    _table = tableName;
+  }
+
+  @Override
+  public void validate(Field field) throws IOException {
+    String fieldType = field.getType().name();
+    String fieldName = field.getName();
+    String existingType = cachedMap.get(fieldName);
+    if (existingType == null) {
+      LOG.info("Trying to record new type [{0}] for field [{1}] in table [{2}]", fieldType, fieldName, _table);
+      if (!cachedMap.putIfMissing(fieldName, fieldType)) {
+        LOG.info("Another node recorded new type for field [{0}] in table [{1}], re-reading from source.", fieldName, _table);
+      }
+      existingType = cachedMap.get(fieldName);
+    }
+    if (!existingType.equals(fieldType)) {
+      LOG.error("The field type [{0}] for field [{1}] does not match the table type of [{2}] for table [{3}]", fieldType, fieldName, existingType, _table);
+      throw new IOException("The field type [" + fieldType + "] for field [" + fieldName + "] does not match the table type of [" + existingType + "] for table [" + _table + "]");
+    }
+  }
+}


Mime
View raw message