incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/3] Added type checking so that the same named field can not have two different types.
Date Fri, 18 Jan 2013 02:31:18 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
deleted file mode 100644
index 76d30f8..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
+++ /dev/null
@@ -1,604 +0,0 @@
-package org.apache.blur.thrift;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_TIMETOLIVE;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DATA_FETCH_THREAD_COUNT;
-import static org.apache.blur.utils.ThriftLuceneConversion.toThrift;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.analysis.BlurAnalyzer;
-import org.apache.blur.concurrent.Executors;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.lucene.search.QueryConverter;
-import org.apache.blur.manager.IndexServer;
-import org.apache.blur.manager.writer.BlurIndex;
-import org.apache.blur.thrift.BlurServer.SearchAction.ACTION;
-import org.apache.blur.thrift.TableLayout.TYPE;
-import org.apache.blur.thrift.commands.BlurCommand;
-import org.apache.blur.thrift.generated.Blur.Client;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.Document;
-import org.apache.blur.thrift.generated.Generation;
-import org.apache.blur.thrift.generated.MutateOptions;
-import org.apache.blur.thrift.generated.Query;
-import org.apache.blur.thrift.generated.QueryArgs;
-import org.apache.blur.thrift.generated.QueryStatus;
-import org.apache.blur.thrift.generated.Session;
-import org.apache.blur.thrift.generated.ShardLayout;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.thrift.generated.TableSchema;
-import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.thrift.generated.Term;
-import org.apache.blur.thrift.generated.TopFieldDocs;
-import org.apache.blur.thrift.generated.UpdatePackage;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.blur.utils.BlurValidations;
-import org.apache.blur.utils.ThriftLuceneConversion;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.thrift.TException;
-
-public class BlurServer extends TableAdmin implements Iface {
-
-  private static final Log LOG = LogFactory.getLog(BlurServer.class);
-  private IndexServer _indexServer;
-  private boolean _closed;
-  private long _maxTimeToLive = TimeUnit.MINUTES.toMillis(1);
-  private int _maxQueryCacheElements = 128;
-  private ExecutorService _dataFetch;
-  private ExecutorService _indexSearcherExecutor;
-  private ExecutorService _searchExecutor;
-  private int _dataFetchThreadCount = 32;
-  private TableLayout _layout;
-
-  public void init() throws BlurException {
-    _dataFetch = Executors.newThreadPool("data-fetch-", _dataFetchThreadCount);
-    _indexSearcherExecutor = Executors.newThreadPool("index-searcher-", 16);
-    _searchExecutor = Executors.newThreadPool("search-", 16);
-
-    if (_configuration == null) {
-      throw new BException("Configuration must be set before initialization.");
-    }
-    _dataFetchThreadCount = _configuration.getInt(BLUR_SHARD_DATA_FETCH_THREAD_COUNT, 8);
-    _maxQueryCacheElements = _configuration.getInt(BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS, 128);
-    _maxTimeToLive = _configuration.getLong(BLUR_SHARD_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1));
-  }
-
-  @Override
-  public TableStats tableStats(String table) throws BlurException, TException {
-    checkTable(table);
-    try {
-      TableStats tableStats = new TableStats();
-      // tableStats.tableName = table;
-      // tableStats.recordCount = _indexServer.getRecordCount(table);
-      // tableStats.rowCount = _indexServer.getRowCount(table);
-      tableStats.bytes = _indexServer.getTableSize(table);
-      tableStats.queries = 0;
-      return tableStats;
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to get table stats [table={0}]", e, table);
-      throw new BException(e.getMessage(), e);
-    }
-  }
-
-  public synchronized void close() {
-    if (!_closed) {
-      _closed = true;
-      _dataFetch.shutdownNow();
-    }
-  }
-
-  public long getMaxTimeToLive() {
-    return _maxTimeToLive;
-  }
-
-  public void setMaxTimeToLive(long maxTimeToLive) {
-    _maxTimeToLive = maxTimeToLive;
-  }
-
-  public int getMaxQueryCacheElements() {
-    return _maxQueryCacheElements;
-  }
-
-  public void setMaxQueryCacheElements(int maxQueryCacheElements) {
-    _maxQueryCacheElements = maxQueryCacheElements;
-  }
-
-  public void setIndexServer(IndexServer indexServer) {
-    _indexServer = indexServer;
-  }
-
-  public int getDataFetchThreadCount() {
-    return _dataFetchThreadCount;
-  }
-
-  public void setDataFetchThreadCount(int dataFetchThreadCount) {
-    _dataFetchThreadCount = dataFetchThreadCount;
-  }
-
-  public void setConfiguration(BlurConfiguration conf) {
-    _configuration = conf;
-  }
-
-  // New interface from this point
-
-  private Map<String, SessionInfo> sessions = new ConcurrentHashMap<String, SessionInfo>();
-
-  @Override
-  public Session openReadSession(String table) throws BlurException, TException {
-    String uuid = UUID.randomUUID().toString();
-    return newSession(table, uuid);
-  }
-
-  private Session newSession(String table, String uuid) throws BlurException {
-    checkTable(table);
-    BlurAnalyzer analyzer = _indexServer.getAnalyzer(table);
-    Map<String, BlurIndex> blurIndexes;
-    try {
-      blurIndexes = _indexServer.getIndexes(table);
-    } catch (IOException e) {
-      LOG.error("Unknown error while trying to fetch index readers.", e);
-      throw new BException(e.getMessage(), e);
-    }
-    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, table);
-    SessionInfo sessionInfo = new SessionInfo();
-    sessionInfo.setUuid(uuid);
-    sessionInfo.setAnalyzer(analyzer);
-    sessionInfo.setTableDescriptor(tableDescriptor);
-
-    for (Entry<String, BlurIndex> entry : blurIndexes.entrySet()) {
-      int index = BlurUtil.getShardIndex(entry.getKey());
-      try {
-        IndexReader indexReader = entry.getValue().getIndexReader();
-        // @TODO use new thread pool here
-        IndexSearcher indexSearcher = new IndexSearcher(indexReader, _searchExecutor);
-        sessionInfo.add(index, indexReader);
-        sessionInfo.add(index, indexSearcher);
-      } catch (IOException e) {
-        LOG.error("Unknown error while trying to fetch index readers.", e);
-      }
-    }
-    sessions.put(uuid, sessionInfo);
-    return new Session(uuid, table);
-  }
-
-  @Override
-  public List<TopFieldDocs> search(final Session session, final QueryArgs queryArgs) throws BlurException, TException {
-    SessionInfo info = getSessionInfo(session);
-    try {
-      Map<Integer, IndexSearcher> searchers = info.getSearchers();
-      List<Integer> shardIndexes = queryArgs.getShardIndexes();
-      TableDescriptor tableDescriptor = info.getTableDescriptor();
-      Collection<SearchAction> searchersToSearch = getSearchActions(tableDescriptor, shardIndexes, searchers);
-
-      List<Future<TopFieldDocs>> futures = new ArrayList<Future<TopFieldDocs>>(searchersToSearch.size());
-      
-      TableContext context = _indexServer.getTableContext(session.getTableName());
-      QueryConverter queryConverter = context.getQueryConverter();
-
-      org.apache.lucene.search.Query query = queryConverter.convert(queryArgs.getQuery());
-      org.apache.lucene.search.Filter filter = ThriftLuceneConversion.toLuceneFilter(queryArgs);
-      org.apache.lucene.search.Sort sort = ThriftLuceneConversion.toLuceneSort(queryArgs);
-      org.apache.lucene.search.ScoreDoc after = ThriftLuceneConversion.toLucene(queryArgs.getAfter());
-      boolean doDocScores = queryArgs.isDoDocScores();
-      boolean doMaxScore = queryArgs.isDoMaxScore();
-      int numberToFetch = queryArgs.getNumberToFetch();
-      for (SearchAction action : searchersToSearch) {
-        final int shardIndex = action.shardIndex;
-        if (action.type == ACTION.LOCAL) {
-          SearchCallable searchCallable = new SearchCallable(shardIndex, action.indexSearcher, after, query, filter, sort, numberToFetch, doDocScores, doMaxScore);
-          Future<TopFieldDocs> future = _indexSearcherExecutor.submit(searchCallable);
-          futures.add(future);
-        } else if (action.type == ACTION.REMOTE) {
-          // @TODO need to send only one call per server, instead of one for
-          // each shard in the table
-          final Connection connection = action.remoteServer;
-          Future<TopFieldDocs> future = _indexSearcherExecutor.submit(new Callable<TopFieldDocs>() {
-            @Override
-            public TopFieldDocs call() throws Exception {
-              List<TopFieldDocs> list = BlurClientManager.execute(connection, new BlurCommand<List<TopFieldDocs>>() {
-                @Override
-                public List<TopFieldDocs> call(Client client) throws BlurException, TException {
-                  QueryArgs remoteQueryArgs = new QueryArgs(queryArgs);
-                  remoteQueryArgs.addToShardIndexes(shardIndex);
-                  return client.search(session, remoteQueryArgs);
-                }
-              });
-              return list.iterator().next();
-            }
-          });
-          futures.add(future);
-        }
-      }
-
-      List<TopFieldDocs> result = new ArrayList<TopFieldDocs>(futures.size());
-      for (Future<TopFieldDocs> future : futures) {
-        result.add(future.get());
-      }
-      return result;
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-
-  static class SearchAction {
-    enum ACTION {
-      LOCAL, REMOTE
-    }
-
-    ACTION type;
-
-    SearchAction(int shardIndex, IndexSearcher indexSearcher) {
-      this.type = ACTION.LOCAL;
-      this.shardIndex = shardIndex;
-      this.indexSearcher = indexSearcher;
-    }
-
-    SearchAction(int shardIndex, Connection remoteServer) {
-      this.type = ACTION.REMOTE;
-      this.shardIndex = shardIndex;
-      this.remoteServer = remoteServer;
-    }
-
-    int shardIndex;
-    IndexSearcher indexSearcher;
-    Connection remoteServer;
-  }
-
-  private Collection<SearchAction> getSearchActions(TableDescriptor tableDescriptor, List<Integer> shardIndexes, Map<Integer, IndexSearcher> searchers) throws BlurException {
-    String name = tableDescriptor.getName();
-    int shardCount = tableDescriptor.getShardCount();
-    Collection<SearchAction> searchersToSearch = new ArrayList<SearchAction>();
-    if (shardIndexes == null) {
-      shardIndexes = new ArrayList<Integer>(shardCount);
-      // all indexes
-      for (int i = 0; i < shardCount; i++) {
-        shardIndexes.add(i);
-      }
-    }
-    for (Integer index : shardIndexes) {
-      IndexSearcher searcher = searchers.get(index);
-      if (searcher != null) {
-        searchersToSearch.add(new SearchAction(index, searcher));
-      } else {
-        searchersToSearch.add(new SearchAction(index, getConnection(name, index)));
-      }
-    }
-    return searchersToSearch;
-  }
-
-  private SessionInfo getSessionInfo(Session session) throws BlurException {
-    SessionInfo info = sessions.get(session.getSessionId());
-    if (info == null) {
-      newSession(session.getTableName(), session.getSessionId());
-      return getSessionInfo(session);
-    }
-    return info;
-  }
-
-  @Override
-  public List<Document> doc(Session session, List<Long> docLocations, Set<String> fieldsToLoad) throws BlurException, TException {
-    try {
-      SessionInfo sessionInfo = getSessionInfo(session);
-      Map<Integer, IndexSearcher> searchers = sessionInfo.getSearchers();
-      List<Document> result = new ArrayList<Document>();
-      for (Long docLocation : docLocations) {
-        if (docLocation == null) {
-          throw new BlurException("Null docLocation is not allowed.", null);
-        }
-        int shardIndex = BlurUtil.getShardIndex(docLocation);
-        int docId = BlurUtil.getDocumentId(docLocation);
-        IndexSearcher searcher = searchers.get(shardIndex);
-        if (searcher == null) {
-          result.addAll(forwardDoc(session, shardIndex, docLocation, fieldsToLoad));
-        } else {
-          org.apache.lucene.document.Document document = searcher.document(docId, fieldsToLoad);
-          result.add(toThrift(document));
-        }
-      }
-      return result;
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-
-  private List<Document> forwardDoc(final Session session, int shardIndex, final Long docLocation, final Set<String> fieldsToLoad) throws BlurException, TException, IOException {
-    // TODO Make more efficient by making a single call to a server for many
-    // docLocations
-    String table = session.getTableName();
-    Connection connection = getConnection(table, shardIndex);
-    return BlurClientManager.execute(connection, new BlurCommand<List<Document>>() {
-      @Override
-      public List<Document> call(Client client) throws BlurException, TException {
-        return client.doc(session, Arrays.asList(docLocation), fieldsToLoad);
-      }
-    });
-  }
-
-  @Override
-  public void closeReadSession(Session session) throws BlurException, TException {
-    SessionInfo sessionInfo = getSessionInfo(session);
-    sessionInfo.releaseReaders();
-  }
-
-  @Override
-  public List<Generation> addDocuments(MutateOptions options, List<Document> documents) throws BlurException, TException {
-    String table = options.getTable();
-    int shardIndex = getShardIndex(options);
-    boolean waitToBeVisible = options.isWaitToBeVisible();
-    boolean writeAheadLog = options.isWriteAheadLog();
-    List<Generation> generations = new ArrayList<Generation>();
-    try {
-      BlurIndex index = getIndex(table, shardIndex);
-      List<Document> indexableDocuments = BlurValidations.getAllIndexableDocuments(documents);
-      if (index == null) {
-        generations.addAll(forwardAddDocuments(options, indexableDocuments));
-      } else {
-        long generation = index.addDocuments(waitToBeVisible, writeAheadLog, indexableDocuments);
-        generations.add(new Generation(table, shardIndex, generation));
-      }
-      return generations;
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-  
-  private int getShardIndex(MutateOptions options) {
-    int shardIndex = options.getShardIndex();
-    if (shardIndex < 0) {
-      // @TODO this is going to be very slow
-      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, options.getTable());
-      int shardCount = tableDescriptor.getShardCount();
-      Random random = new Random();
-      return random.nextInt(shardCount);
-    }
-    return shardIndex;
-  }
-
-  @Override
-  public void blockUntilGenerationIsVisible(List<Generation> generations, boolean forceRefresh) throws BlurException, TException {
-    try {
-      for (Generation generation : generations) {
-        String table = generation.getTable();
-        int shardIndex = generation.getShardIndex();
-        BlurIndex index = getIndex(table, shardIndex);
-        if (index == null) {
-          forwardBlockUntilGenerationIsVisible(generation, forceRefresh);
-        } else {
-          index.blockUntilGenerationIsVisible(generation.getGeneration(), forceRefresh);
-        }
-      }
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-
-  private void forwardBlockUntilGenerationIsVisible(final Generation generation, final boolean forceRefresh) throws BlurException, TException, IOException {
-    String table = generation.getTable();
-    int shardIndex = generation.getShardIndex();
-    Connection connection = getConnection(table, shardIndex);
-    BlurClientManager.execute(connection, new BlurCommand<Void>() {
-      @Override
-      public Void call(Client client) throws BlurException, TException {
-        client.blockUntilGenerationIsVisible(Arrays.asList(generation), forceRefresh);
-        return null;
-      }
-    });
-  }
-
-  @Override
-  public List<Generation> deleteDocumentsByQueries(MutateOptions options, List<org.apache.blur.thrift.generated.Query> queries) throws BlurException, TException {
-    String table = options.getTable();
-    int shardIndex = getShardIndex(options);
-    boolean waitToBeVisible = options.isWaitToBeVisible();
-    boolean writeAheadLog = options.isWriteAheadLog();
-    List<Generation> generations = new ArrayList<Generation>();
-    try {
-      BlurIndex index = getIndex(table, shardIndex);
-      if (index == null) {
-        generations.addAll(forwardDeleteDocumentsByQueries(options, queries));
-      } else {
-        long generation = index.deleteDocuments(waitToBeVisible, writeAheadLog, queries.toArray(new Query[queries.size()]));
-        generations.add(new Generation(table, shardIndex, generation));
-      }
-      return generations;
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-  
-  @Override
-  public List<Generation> deleteDocuments(MutateOptions options, List<Term> terms) throws BlurException, TException {
-    String table = options.getTable();
-    int shardIndex = getShardIndex(options);
-    boolean waitToBeVisible = options.isWaitToBeVisible();
-    boolean writeAheadLog = options.isWriteAheadLog();
-    List<Generation> generations = new ArrayList<Generation>();
-    try {
-      BlurIndex index = getIndex(table, shardIndex);
-      if (index == null) {
-        generations.addAll(forwardDeleteDocuments(options, terms));
-      } else {
-        long generation = index.deleteDocuments(waitToBeVisible, writeAheadLog, terms.toArray(new Term[terms.size()]));
-        generations.add(new Generation(table, shardIndex, generation));
-      }
-      return generations;
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-
-  @Override
-  public List<Generation> updateDocuments(MutateOptions options, List<UpdatePackage> updatePackages) throws BlurException, TException {
-    String table = options.getTable();
-    int shardIndex = getShardIndex(options);
-    boolean waitToBeVisible = options.isWaitToBeVisible();
-    boolean writeAheadLog = options.isWriteAheadLog();
-    List<Generation> generations = new ArrayList<Generation>();
-    try {
-      BlurIndex index = getIndex(table, shardIndex);
-      List<UpdatePackage> updatePackagesWithIndexableDocs = BlurValidations.getAllIndexablePackages(updatePackages); 
-      if (index == null) {
-        generations.addAll(forwardUpdateDocuments(options, updatePackagesWithIndexableDocs));
-      } else {
-        long generation = index.updateDocuments(waitToBeVisible, writeAheadLog, updatePackagesWithIndexableDocs);
-        generations.add(new Generation(table, shardIndex, generation));
-      }
-      return generations;
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-
-  private BlurIndex getIndex(String table, int shardIndex) throws BException {
-    Map<String, BlurIndex> blurIndexes;
-    try {
-      blurIndexes = _indexServer.getIndexes(table);
-    } catch (IOException e) {
-      LOG.error("Unknown error while trying to fetch index readers.", e);
-      throw new BException(e.getMessage(), e);
-    }
-    BlurIndex index = blurIndexes.get(BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardIndex));
-    return index;
-  }
-
-  @Override
-  public List<Integer> serverLayout(String table, String server) throws BlurException, TException {
-    throw new BlurException("Not implemented", null);
-  }
-
-  @Override
-  public void cancelQuery(Session session, long id) throws BlurException, TException {
-    throw new BlurException("Not implemented", null);
-  }
-
-  @Override
-  public List<Long> queryStatusIdList(Session session) throws BlurException, TException {
-    throw new BlurException("Not implemented", null);
-  }
-
-  @Override
-  public QueryStatus queryStatus(Session session, long id) throws BlurException, TException {
-    throw new BlurException("Not implemented", null);
-  }
-
-  @Override
-  public TableSchema schema(Session session, List<Integer> shardIds) throws BlurException, TException {
-    SessionInfo info = getSessionInfo(session);
-    
-    
-    
-    
-    throw new BlurException("Not implemented", null);
-  }
-
-  public TableLayout getLayout() {
-    return _layout;
-  }
-
-  public void setLayout(TableLayout layout) {
-    this._layout = layout;
-  }
-
-  private List<Generation> forwardAddDocuments(final MutateOptions options, final List<Document> documents) throws BlurException, TException, IOException {
-    String table = options.getTable();
-    int shardIndex = options.getShardIndex();
-    Connection connection = getConnection(table, shardIndex);
-    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
-      @Override
-      public List<Generation> call(Client client) throws BlurException, TException {
-        return client.addDocuments(options, documents);
-      }
-    });
-  }
-
-  private List<Generation> forwardUpdateDocuments(final MutateOptions options, final List<UpdatePackage> updatePackages) throws BlurException, TException, IOException {
-    String table = options.getTable();
-    int shardIndex = options.getShardIndex();
-    Connection connection = getConnection(table, shardIndex);
-    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
-      @Override
-      public List<Generation> call(Client client) throws BlurException, TException {
-        return client.updateDocuments(options, updatePackages);
-      }
-    });
-  }
-
-  private List<Generation> forwardDeleteDocumentsByQueries(final MutateOptions options, final List<org.apache.blur.thrift.generated.Query> queries) throws BlurException,
-      TException, IOException {
-    String table = options.getTable();
-    int shardIndex = options.getShardIndex();
-    Connection connection = getConnection(table, shardIndex);
-    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
-      @Override
-      public List<Generation> call(Client client) throws BlurException, TException {
-        return client.deleteDocumentsByQueries(options, queries);
-      }
-    });
-  }
-
-  private List<Generation> forwardDeleteDocuments(final MutateOptions options, final List<Term> terms) throws BlurException, TException, IOException {
-    String table = options.getTable();
-    int shardIndex = options.getShardIndex();
-    Connection connection = getConnection(table, shardIndex);
-    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
-      @Override
-      public List<Generation> call(Client client) throws BlurException, TException {
-        return client.deleteDocuments(options, terms);
-      }
-    });
-  }
-
-  private Connection getConnection(String table, int shardIndex) {
-    String server = _layout.findServer(table, shardIndex, TYPE.WRITABLE);
-    return new Connection(server);
-  }
-
-  @Override
-  public Map<String, ShardLayout> shardLayout(String table) throws BlurException, TException {
-    throw new BlurException("Not implemented", null);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/thrift/Configurable.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/Configurable.java b/src/blur-core/src/main/java/org/apache/blur/thrift/Configurable.java
deleted file mode 100644
index 77fec2f..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/Configurable.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.thrift;
-
-public interface Configurable {
-
-  public void setTableContext(TableContext context);
-
-  public TableContext getTableContext();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/thrift/Configured.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/Configured.java b/src/blur-core/src/main/java/org/apache/blur/thrift/Configured.java
deleted file mode 100644
index 9d4b70a..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/Configured.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.blur.thrift;
-
-
-public abstract class Configured implements Configurable {
-
-  private TableContext context;
-
-  public Configured() {
-    this(null);
-  }
-
-  public Configured(TableContext context) {
-    this.context = context;
-  }
-
-  @Override
-  public TableContext getTableContext() {
-    return context;
-  }
-
-  @Override
-  public void setTableContext(TableContext context) {
-    this.context = context;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/thrift/SearchCallable.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/SearchCallable.java b/src/blur-core/src/main/java/org/apache/blur/thrift/SearchCallable.java
deleted file mode 100644
index 6c58883..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/SearchCallable.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.apache.blur.thrift;
-
-import static org.apache.blur.utils.ThriftLuceneConversion.setShardIndexTopDocs;
-import static org.apache.blur.utils.ThriftLuceneConversion.setShardIndexTopFieldDocs;
-import static org.apache.blur.utils.ThriftLuceneConversion.toThrift;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-
-import org.apache.blur.thrift.generated.TopFieldDocs;
-import org.apache.lucene.search.Filter;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.Sort;
-
-public class SearchCallable implements Callable<TopFieldDocs> {
-  private final ScoreDoc after;
-  private final Sort sort;
-  private final Filter filter;
-  private final Query query;
-  private final IndexSearcher searcher;
-  private final int count;
-  private final boolean doDocScores;
-  private final boolean doMaxScore;
-  private final int shardIndex;
-
-  SearchCallable(int shardIndex, IndexSearcher searcher, ScoreDoc after, Query query, Filter filter, Sort sort, int count, boolean doDocScores, boolean doMaxScore) {
-    this.after = after;
-    this.searcher = searcher;
-    this.query = query;
-    this.filter = filter;
-    this.sort = sort;
-    this.count = count;
-    this.doDocScores = doDocScores;
-    this.doMaxScore = doMaxScore;
-    this.shardIndex = shardIndex;
-  }
-
-  @Override
-  public TopFieldDocs call() throws Exception {
-    return addShardIndex(doSearch());
-  }
-
-  private TopFieldDocs addShardIndex(TopFieldDocs topFieldDocs) {
-    topFieldDocs.setShardIndex(shardIndex);
-    return topFieldDocs;
-  }
-
-  private TopFieldDocs doSearch() throws IOException {
-    if (after == null) {
-      if (sort == null) {
-        return toThrift(setShardIndexTopDocs(shardIndex, searcher.search(query, filter, count)));
-      } else {
-        return toThrift(setShardIndexTopFieldDocs(shardIndex, searcher.search(query, filter, count, sort, doDocScores, doMaxScore)));
-      }
-    } else {
-      if (sort == null) {
-        return toThrift(setShardIndexTopDocs(shardIndex, searcher.searchAfter(after, query, filter, count)));
-      } else {
-        return toThrift(setShardIndexTopFieldDocs(shardIndex,
-            (org.apache.lucene.search.TopFieldDocs) searcher.searchAfter(after, query, filter, count, sort, doDocScores, doMaxScore)));
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java b/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java
deleted file mode 100644
index 9d093af..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package org.apache.blur.thrift;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.blur.analysis.BlurAnalyzer;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.IndexSearcher;
-
-public class SessionInfo {
-
-  private static final Log LOG = LogFactory.getLog(SessionInfo.class);
-
-  private String uuid;
-  private BlurAnalyzer analyzer;
-  private Map<Integer, IndexReader> readers = new HashMap<Integer, IndexReader>();
-  private Map<Integer, IndexSearcher> searchers = new HashMap<Integer, IndexSearcher>();
-  private TableDescriptor tableDescriptor;
-
-  public BlurAnalyzer getAnalyzer() {
-    return analyzer;
-  }
-
-  public void setAnalyzer(BlurAnalyzer analyzer) {
-    this.analyzer = analyzer;
-  }
-
-  public String getUuid() {
-    return uuid;
-  }
-
-  public void setUuid(String uuid) {
-    this.uuid = uuid;
-  }
-
-  public void add(int index, IndexReader indexReader) {
-    readers.put(index, indexReader);
-  }
-
-  public void add(int index, IndexSearcher indexSearcher) {
-    searchers.put(index, indexSearcher);
-  }
-
-  public Map<Integer, IndexReader> getReaders() {
-    return readers;
-  }
-
-  public void setReaders(Map<Integer, IndexReader> readers) {
-    this.readers = readers;
-  }
-
-  public Map<Integer, IndexSearcher> getSearchers() {
-    return searchers;
-  }
-
-  public void setSearchers(Map<Integer, IndexSearcher> searchers) {
-    this.searchers = searchers;
-  }
-
-  public void releaseReaders() {
-    for (Entry<Integer, IndexReader> entry : readers.entrySet()) {
-      IndexReader reader = entry.getValue();
-      try {
-        reader.decRef();
-      } catch (IOException e) {
-        LOG.error("Unknown exception while trying to decRef on reader [{0}]", e, reader);
-      }
-    }
-  }
-
-  public TableDescriptor getTableDescriptor() {
-    return tableDescriptor;
-  }
-
-  public void setTableDescriptor(TableDescriptor tableDescriptor) {
-    this.tableDescriptor = tableDescriptor;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/thrift/ShardContext.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ShardContext.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ShardContext.java
deleted file mode 100644
index 48709b9..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ShardContext.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package org.apache.blur.thrift;
-
-import java.io.IOException;
-
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.store.Directory;
-
-public class ShardContext {
-
-  private String shard;
-  private Path walShardPath;
-  private Path hdfsDirPath;
-  private Directory directory;
-  private TableContext tableContext;
-
-  public TableContext getTableContext() {
-    return tableContext;
-  }
-
-  public void setTableContext(TableContext tableContext) {
-    this.tableContext = tableContext;
-  }
-
-  protected ShardContext() {
-
-  }
-
-  public Directory getDirectory() {
-    return directory;
-  }
-
-  public void setDirectory(Directory directory) {
-    this.directory = directory;
-  }
-
-  public Path getHdfsDirPath() {
-    return hdfsDirPath;
-  }
-
-  public void setHdfsDirPath(Path hdfsDirPath) {
-    this.hdfsDirPath = hdfsDirPath;
-  }
-
-  public String getShard() {
-    return shard;
-  }
-
-  public void setShard(String shard) {
-    this.shard = shard;
-  }
-
-  public Path getWalShardPath() {
-    return walShardPath;
-  }
-
-  public void setWalShardPath(Path walShardPath) {
-    this.walShardPath = walShardPath;
-  }
-
-  public static ShardContext create(TableContext tableContext, String shard) throws IOException {
-    ShardContext shardContext = new ShardContext();
-    shardContext.tableContext = tableContext;
-    shardContext.walShardPath = new Path(tableContext.getWalTablePath(), shard);
-    shardContext.hdfsDirPath = new Path(tableContext.getTablePath(), shard);
-    shardContext.shard = shard;
-    shardContext.directory = new HdfsDirectory(tableContext.getConfiguration(), shardContext.hdfsDirPath);
-    return shardContext;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java b/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
deleted file mode 100644
index 77f0eda..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
+++ /dev/null
@@ -1,239 +0,0 @@
-package org.apache.blur.thrift;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.List;
-import java.util.Map;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.manager.clusterstatus.ClusterStatus;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.thrift.TException;
-import org.apache.zookeeper.ZooKeeper;
-
-
-public abstract class TableAdmin implements Iface {
-
-  private static final Log LOG = LogFactory.getLog(TableAdmin.class);
-  protected ZooKeeper _zookeeper;
-  protected ClusterStatus _clusterStatus;
-  protected BlurConfiguration _configuration;
-  private String cluster;
-
-  @Override
-  public boolean isInSafeMode() throws BlurException, TException {
-    try {
-      return _clusterStatus.isInSafeMode(true);
-    } catch (Exception e) {
-      LOG.error("Unknown error during safe mode check of [cluster={0}]", e, cluster);
-      throw new BException(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public final void createTable(TableDescriptor tableDescriptor) throws BlurException, TException {
-    try {
-      _clusterStatus.createTable(tableDescriptor);
-    } catch (Exception e) {
-      LOG.error("Unknown error during create of [table={0}, tableDescriptor={1}]", e, tableDescriptor.name, tableDescriptor);
-      throw new BException(e.getMessage(), e);
-    }
-    if (tableDescriptor.isEnabled()) {
-      enableTable(tableDescriptor.getName());
-    }
-  }
-
-  @Override
-  public final void disableTable(String table) throws BlurException, TException {
-    try {
-      _clusterStatus.disableTable(table);
-      waitForTheTableToDisable(table);
-      waitForTheTableToDisengage(table);
-    } catch (Exception e) {
-      LOG.error("Unknown error during disable of [table={0}]", e, table);
-      throw new BException(e.getMessage(), e);
-    }
-  }
-
-  private void waitForTheTableToDisengage(String table) throws BlurException, TException {
-    // LOG.info("Waiting for shards to disengage on table [" + table + "]");
-  }
-
-  private void waitForTheTableToDisable(String table) throws BlurException, TException {
-    LOG.info("Waiting for shards to disable on table [" + table + "]");
-    while (true) {
-      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(false, table);
-      if (!tableDescriptor.isEnabled()) {
-        return;
-      }
-      try {
-        Thread.sleep(3000);
-      } catch (InterruptedException e) {
-        LOG.error("Unknown error while enabling table [" + table + "]", e);
-        throw new BException("Unknown error while enabling table [" + table + "]", e);
-      }
-    }
-  }
-
-  @Override
-  public final void enableTable(String table) throws BlurException, TException {
-    try {
-      _clusterStatus.enableTable(table);
-      waitForTheTableToEnable(table);
-      waitForTheTableToEngage(table);
-    } catch (Exception e) {
-      LOG.error("Unknown error during enable of [table={0}]", e, table);
-      throw new BException(e.getMessage(), e);
-    }
-  }
-
-  private void waitForTheTableToEnable(String table) throws BlurException {
-    LOG.info("Waiting for shards to engage on table [" + table + "]");
-    while (true) {
-      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(false, table);
-      if (tableDescriptor.isEnabled()) {
-        return;
-      }
-      try {
-        Thread.sleep(3000);
-      } catch (InterruptedException e) {
-        LOG.error("Unknown error while enabling table [" + table + "]", e);
-        throw new BException("Unknown error while enabling table [" + table + "]", e);
-      }
-    }
-  }
-
-  private void waitForTheTableToEngage(String table) throws BlurException, TException {
-    LOG.info("IMPLEMENT - Waiting for shards to engage on table [" + table + "]");
-    
-//    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(false, table);
-//    int shardCount = tableDescriptor.shardCount;
-//    LOG.info("Waiting for shards to engage on table [" + table + "]");
-//    while (true) {
-//      try {
-//        Thread.sleep(3000);
-//      } catch (InterruptedException e) {
-//        LOG.error("Unknown error while engaging table [" + table + "]", e);
-//        throw new BException("Unknown error while engaging table [" + table + "]", e);
-//      }
-//      try {
-//        Map<String, String> shardServerLayout = getLayout();//shardServerLayout(table);
-//        LOG.info("Shards [" + shardServerLayout.size() + "/" + shardCount + "] of table [" + table + "] engaged");
-//        if (shardServerLayout.size() == shardCount) {
-//          return;
-//        }
-//      } catch (BlurException e) {
-//        LOG.info("Stilling waiting", e);
-//      } catch (TException e) {
-//        LOG.info("Stilling waiting", e);
-//      }
-//    }
-  }
-
-  private Map<String, String> getLayout() throws BlurException, TException {
-    return null;
-  }
-
-  @Override
-  public final void removeTable(String table, boolean deleteIndexFiles) throws BlurException, TException {
-    try {
-      _clusterStatus.removeTable(table, deleteIndexFiles);
-    } catch (Exception e) {
-      LOG.error("Unknown error during remove of [table={0}]", e, table);
-      throw new BException(e.getMessage(), e);
-    }
-  }
-
-  public void checkTable(String table) throws BlurException {
-    if (table == null) {
-      throw new BlurException("Table cannot be null.", null);
-    }
-    boolean inSafeMode = _clusterStatus.isInSafeMode(true);
-    if (inSafeMode) {
-      throw new BlurException("Cluster for [" + table + "] is in safe mode", null);
-    }
-    boolean exists = _clusterStatus.exists(true, table);
-    if (exists) {
-      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, table);
-      if (tableDescriptor.isEnabled()) {
-        return;
-      }
-      throw new BlurException("Table [" + table + "] exists, but is not enabled", null);
-    } else {
-      throw new BlurException("Table [" + table + "] does not exist", null);
-    }
-  }
-
-
-  public void checkForUpdates(String table) throws BlurException {
-    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, table);
-    if (tableDescriptor.isReadOnly()) {
-      throw new BlurException("Table [" + table + "] in cluster [" + cluster + "] is read only.", null);
-    }
-  }
-
-
-  @Override
-  public final List<String> serverList() throws BlurException, TException {
-    try {
-      return _clusterStatus.getServerList(true);
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to get a shard server list.", e);
-      throw new BException("Unknown error while trying to get a shard server list.", e);
-    }
-  }
-
-  @Override
-  public final TableDescriptor describe(final String table) throws BlurException, TException {
-    try {
-      return _clusterStatus.getTableDescriptor(true, table);
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to describe a table [" + table + "].", e);
-      throw new BException("Unknown error while trying to describe a table [" + table + "].", e);
-    }
-  }
-
-  @Override
-  public final List<String> tableList() throws BlurException, TException {
-    try {
-      return _clusterStatus.getTableList(true);
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to get a table list.", e);
-      throw new BException("Unknown error while trying to get a table list.", e);
-    }
-  }
-
-  public ClusterStatus getClusterStatus() {
-    return _clusterStatus;
-  }
-
-  public void setClusterStatus(ClusterStatus clusterStatus) {
-    _clusterStatus = clusterStatus;
-  }
-
-  public void setZookeeper(ZooKeeper zookeeper) {
-    _zookeeper = zookeeper;
-  }
-  
-  public void setConfiguration(BlurConfiguration config) {
-    _configuration = config;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/thrift/TableContext.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/TableContext.java b/src/blur-core/src/main/java/org/apache/blur/thrift/TableContext.java
deleted file mode 100644
index 0e6b41a..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/TableContext.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package org.apache.blur.thrift;
-
-import static org.apache.blur.utils.BlurConstants.BLUR_LUCENE_INDEX_DELETION_POLICY_CLASS;
-import static org.apache.blur.utils.BlurConstants.BLUR_LUCENE_INDEX_SIMILARITY_CLASS;
-import static org.apache.blur.utils.BlurConstants.BLUR_QUERY_CONVERTER_CLASS;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS;
-
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.blur.analysis.BlurAnalyzer;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.lucene.search.QueryConverter;
-import org.apache.blur.lucene.search.QueryConverterImpl;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.search.similarities.DefaultSimilarity;
-import org.apache.lucene.search.similarities.Similarity;
-
-public class TableContext {
-  
-  private static final Log LOG = LogFactory.getLog(TableContext.class);
-
-  private static final String LOGS = "logs";
-
-  private Path tablePath;
-  private Path walTablePath;
-  private BlurAnalyzer analyzer;
-  private String defaultFieldName;
-  private String table;
-  private IndexDeletionPolicy indexDeletionPolicy;
-  private Similarity similarity;
-  private QueryConverter queryConverter;
-  private Configuration configuration;
-  private TableDescriptor descriptor;
-  private long timeBetweenCommits;
-  private long timeBetweenRefreshs;
-
-  protected TableContext() {
-
-  }
-
-  public static TableContext create(TableDescriptor tableDescriptor) {
-    LOG.info("Creating table context for table [{0}]", tableDescriptor.getName());
-    Configuration configuration = new Configuration();
-    Map<String, String> properties = tableDescriptor.getProperties();
-    if (properties != null) {
-      for (Entry<String, String> prop : properties.entrySet()) {
-        configuration.set(prop.getKey(), prop.getValue());
-      }
-    }
-
-    TableContext tableContext = new TableContext();
-    tableContext.configuration = configuration;
-    tableContext.tablePath = new Path(tableDescriptor.getStoragePath());
-    tableContext.walTablePath = new Path(tableContext.tablePath, LOGS);
-    tableContext.analyzer = new BlurAnalyzer(tableDescriptor.getAnalyzer());
-    tableContext.defaultFieldName = tableDescriptor.getDefaultFieldName();
-    tableContext.table = tableDescriptor.getName();
-    tableContext.descriptor = tableDescriptor;
-    tableContext.timeBetweenCommits = configuration.getLong(BLUR_SHARD_TIME_BETWEEN_COMMITS, 60000);
-    tableContext.timeBetweenRefreshs = configuration.getLong(BLUR_SHARD_TIME_BETWEEN_REFRESHS, 5000);
-
-    Class<?> c1 = configuration.getClass(BLUR_LUCENE_INDEX_DELETION_POLICY_CLASS, KeepOnlyLastCommitDeletionPolicy.class);
-    tableContext.indexDeletionPolicy = (IndexDeletionPolicy) configure(ReflectionUtils.newInstance(c1, configuration), tableContext);
-    Class<?> c2 = configuration.getClass(BLUR_LUCENE_INDEX_SIMILARITY_CLASS, DefaultSimilarity.class);
-    tableContext.similarity = (Similarity) configure(ReflectionUtils.newInstance(c2, configuration), tableContext);
-    Class<?> c3 = configuration.getClass(BLUR_QUERY_CONVERTER_CLASS, QueryConverterImpl.class);
-    tableContext.queryConverter = (QueryConverter) configure(ReflectionUtils.newInstance(c3, configuration), tableContext);
-    return tableContext;
-  }
-
-  private static Object configure(Object o, TableContext tableContext) {
-    if (o instanceof Configurable) {
-      ((Configurable) o).setTableContext(tableContext);
-    }
-    return o;
-  }
-
-  public IndexDeletionPolicy getIndexDeletionPolicy() {
-    return indexDeletionPolicy;
-  }
-
-  public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
-    this.indexDeletionPolicy = indexDeletionPolicy;
-  }
-
-  public Similarity getSimilarity() {
-    return similarity;
-  }
-
-  public void setSimilarity(Similarity similarity) {
-    this.similarity = similarity;
-  }
-
-  public long getTimeBetweenCommits() {
-    return timeBetweenCommits;
-  }
-
-  public void setTimeBetweenCommits(long timeBetweenCommits) {
-    this.timeBetweenCommits = timeBetweenCommits;
-  }
-
-  public long getTimeBetweenRefreshs() {
-    return timeBetweenRefreshs;
-  }
-
-  public void setTimeBetweenRefreshs(long timeBetweenRefreshs) {
-    this.timeBetweenRefreshs = timeBetweenRefreshs;
-  }
-
-  public BlurAnalyzer getAnalyzer() {
-    return analyzer;
-  }
-
-  public void setAnalyzer(BlurAnalyzer analyzer) {
-    this.analyzer = analyzer;
-  }
-
-  public String getTable() {
-    return table;
-  }
-
-  public void setTable(String table) {
-    this.table = table;
-  }
-
-  public QueryConverter getQueryConverter() {
-    return queryConverter;
-  }
-
-  public void setQueryConverter(QueryConverter queryConverter) {
-    this.queryConverter = queryConverter;
-  }
-
-  public Configuration getConfiguration() {
-    return configuration;
-  }
-
-  public void setConfiguration(Configuration configuration) {
-    this.configuration = configuration;
-  }
-
-  public TableDescriptor getDescriptor() {
-    return descriptor;
-  }
-
-  public void setDescriptor(TableDescriptor descriptor) {
-    this.descriptor = descriptor;
-  }
-
-  public Path getTablePath() {
-    return tablePath;
-  }
-
-  public void setTablePath(Path tablePath) {
-    this.tablePath = tablePath;
-  }
-
-  public Path getWalTablePath() {
-    return walTablePath;
-  }
-
-  public void setWalTablePath(Path walTablePath) {
-    this.walTablePath = walTablePath;
-  }
-
-  public String getDefaultFieldName() {
-    return defaultFieldName;
-  }
-
-  public void setDefaultFieldName(String defaultFieldName) {
-    this.defaultFieldName = defaultFieldName;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/thrift/TableLayout.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/TableLayout.java b/src/blur-core/src/main/java/org/apache/blur/thrift/TableLayout.java
deleted file mode 100644
index e1628e7..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/TableLayout.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.blur.thrift;
-
-public abstract class TableLayout {
-
-  public enum TYPE {
-    WRITABLE, ONLINE, BACKUP
-  }
-  
-  public abstract String findServer(String table, int shard, TYPE type);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
index 4449908..8e59dc5 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
@@ -57,6 +57,8 @@ import org.apache.blur.manager.indexserver.DistributedIndexServer;
 import org.apache.blur.manager.indexserver.DistributedLayoutManager;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
 import org.apache.blur.metrics.BlurMetrics;
+import org.apache.blur.server.BlurServer;
+import org.apache.blur.server.TableLayout;
 import org.apache.blur.store.blockcache.BlockCache;
 import org.apache.blur.store.blockcache.BlockDirectory;
 import org.apache.blur.store.blockcache.BlockDirectoryCache;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/main/java/org/apache/blur/utils/ThriftLuceneConversion.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/ThriftLuceneConversion.java b/src/blur-core/src/main/java/org/apache/blur/utils/ThriftLuceneConversion.java
index 12163ce..f093ca2 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/ThriftLuceneConversion.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/ThriftLuceneConversion.java
@@ -8,6 +8,7 @@ import java.util.List;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.serializer.QueryWritable;
+import org.apache.blur.server.TypeChecker;
 import org.apache.blur.thrift.generated.Field;
 import org.apache.blur.thrift.generated.QueryArgs;
 import org.apache.blur.thrift.generated.ScoreDoc;
@@ -114,11 +115,11 @@ public class ThriftLuceneConversion {
         result.setType(TYPE.BINARY);
         result.setValue(toByteBuffer(bytes));
       } else {
-        //@TODO this is a bit of a hack
+        // @TODO this is a bit of a hack
         if (field.fieldType().omitNorms()) {
           result.setType(TYPE.STRING);
-          result.setValue(toByteBuffer(field.stringValue()));  
-        } else{
+          result.setValue(toByteBuffer(field.stringValue()));
+        } else {
           result.setType(TYPE.TEXT);
           result.setValue(toByteBuffer(field.stringValue()));
         }
@@ -227,16 +228,17 @@ public class ThriftLuceneConversion {
     }
   }
 
-  public static Document toLucene(org.apache.blur.thrift.generated.Document document) throws IOException {
+  public static Document toLucene(org.apache.blur.thrift.generated.Document document, TypeChecker typeChecker) throws IOException {
     Document result = new Document();
     List<Field> fields = document.getFields();
     for (Field field : fields) {
-      result.add(toLucene(field));
+      result.add(toLucene(field, typeChecker));
     }
     return result;
   }
 
-  public static IndexableField toLucene(Field field) throws IOException {
+  public static IndexableField toLucene(Field field, TypeChecker typeChecker) throws IOException {
+    typeChecker.validate(field);
     TYPE type = field.getType();
     org.apache.lucene.document.Field result;
     switch (type) {
@@ -303,10 +305,10 @@ public class ThriftLuceneConversion {
     return result;
   }
 
-  public static List<Document> toLucene(List<org.apache.blur.thrift.generated.Document> documents) throws IOException {
+  public static List<Document> toLucene(List<org.apache.blur.thrift.generated.Document> documents, TypeChecker typeChecker) throws IOException {
     List<Document> docs = new ArrayList<Document>();
     for (org.apache.blur.thrift.generated.Document doc : documents) {
-      docs.add(toLucene(doc));
+      docs.add(toLucene(doc, typeChecker));
     }
     return docs;
   }
@@ -326,7 +328,7 @@ public class ThriftLuceneConversion {
     qw.readFields(in);
     return qw.getQuery();
   }
-  
+
   public static Query toLuceneQuery(QueryArgs queryArgs) {
     return toLuceneQuery(queryArgs.getQuery());
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
index e81e76a..57818c0 100644
--- a/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
@@ -29,8 +29,8 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.concurrent.Executors;
-import org.apache.blur.thrift.ShardContext;
-import org.apache.blur.thrift.TableContext;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.Document;
 import org.apache.blur.thrift.generated.Field;
 import org.apache.blur.thrift.generated.TYPE;
@@ -86,7 +86,7 @@ public class BlurNRTIndexTest {
     tableDescriptor.setStoragePath(new File(base, "index").toURI().toString());
     tableDescriptor.setName("testing-table");
     tableDescriptor.putToProperties(BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS, Long.toString(refresh));
-    TableContext context = TableContext.create(tableDescriptor);
+    TableContext context = TableContext.create(tableDescriptor, null);
     writer.setContext(ShardContext.create(context, "testing-shard"));
     // writer.setAnalyzer(analyzer);
     // writer.setTable("testing-table");

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-util/src/main/java/org/apache/blur/CachedMap.java
----------------------------------------------------------------------
diff --git a/src/blur-util/src/main/java/org/apache/blur/CachedMap.java b/src/blur-util/src/main/java/org/apache/blur/CachedMap.java
new file mode 100644
index 0000000..1ed8c36
--- /dev/null
+++ b/src/blur-util/src/main/java/org/apache/blur/CachedMap.java
@@ -0,0 +1,53 @@
+package org.apache.blur;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+public abstract class CachedMap {
+
+  /**
+   * Clears the in memory cache of the map, this forces a re-read from the
+   * source.
+   */
+  public abstract void clearCache() throws IOException;
+
+  /**
+   * Fetches the value by key, if the in memory cache is missing the value then
+   * re-read from source if missing from source return null.
+   * 
+   * @param key
+   *          the key.
+   * @return the value.
+   * @throws IOException
+   */
+  public abstract String get(String key) throws IOException;
+
+  /**
+   * Puts the value with the given key into the map if the key was missing.
+   * Returns true if the key with the given value was set otherwise false if a
+   * key already existed.
+   * 
+   * @param key
+   *          the key.
+   * @param value
+   *          the value.
+   * @return boolean true is successful, false if not.
+   */
+  public abstract boolean putIfMissing(String key, String value) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2924318/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZkCachedMap.java
----------------------------------------------------------------------
diff --git a/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZkCachedMap.java b/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZkCachedMap.java
new file mode 100644
index 0000000..d0237e5
--- /dev/null
+++ b/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZkCachedMap.java
@@ -0,0 +1,193 @@
+package org.apache.blur.zookeeper;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.CachedMap;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * This is an simple implementation of a set-once map of string-to-string that
+ * is backed by ZooKeeper. Meaning that once the value is set a single time it
+ * cannot be set to a different value. The clear cache method is called when the
+ * internal cache is to be cleared and re-read from ZooKeeper. <br>
+ * <br>
+ * Usage:<br>
+ * <br>
+ * ZkCachedMap map = new ZkCachedMap(zooKeeper, path);<br>
+ * String key = "key";<br>
+ * String newValue = "value";<br>
+ * String value = map.get(key);<br>
+ * if (value == null) {<br>
+ * &nbsp;&nbsp;if (map.putIfMissing(key, newValue)) {<br>
+ * &nbsp;&nbsp;&nbsp;&nbsp;System.out.println("Yay! My value was taken.");<br>
+ * &nbsp;&nbsp;&nbsp;&nbsp;value = newValue;<br>
+ * &nbsp;&nbsp;} else {<br>
+ * &nbsp;&nbsp;&nbsp;&nbsp;System.out.println("Boo! Someone beat me to it.");<br>
+ * &nbsp;&nbsp;&nbsp;&nbsp;value = map.get(key);<br>
+ * &nbsp;&nbsp;}<br>
+ * }<br>
+ * System.out.println("key [" + key + "] value [" + value + "]");<br>
+ * 
+ */
+public class ZkCachedMap extends CachedMap {
+
+  private static final String SEP = "-";
+  
+  private final Map<String, String> cache = new ConcurrentHashMap<String, String>();
+  private final ZooKeeper zooKeeper;
+  private final String basePath;
+
+  public ZkCachedMap(ZooKeeper zooKeeper, String basePath) {
+    this.zooKeeper = zooKeeper;
+    this.basePath = basePath;
+  }
+
+  @Override
+  public void clearCache() {
+    cache.clear();
+  }
+
+  /**
+   * Checks the in memory map first, then fetches from ZooKeeper.
+   * 
+   * @param key
+   *          the key.
+   * @return the value, null if it does not exist.
+   * @exception IOException
+   *              if there is an io error.
+   */
+  @Override
+  public String get(String key) throws IOException {
+    String value = cache.get(key);
+    if (value != null) {
+      return value;
+    }
+    return getFromZooKeeper(key);
+  }
+
+  /**
+   * Checks the in memory map first, if it exists then return true. If missing
+   * then check ZooKeeper.
+   * 
+   * @param key
+   *          the key.
+   * @param value
+   *          the value.
+   * @return boolean, true if the put was successful, false if a value already
+   *         exists.
+   * @exception IOException
+   *              if there is an io error.
+   */
+  @Override
+  public boolean putIfMissing(String key, String value) throws IOException {
+    String existingValue = cache.get(key);
+    if (existingValue != null) {
+      return false;
+    }
+    return putIfMissingFromZooKeeper(key, value);
+  }
+
+  private String getFromZooKeeper(String key) throws IOException {
+    try {
+      List<String> keys = new ArrayList<String>(zooKeeper.getChildren(basePath, false));
+      Collections.sort(keys);
+      for (String k : keys) {
+        String realKey = getRealKey(k);
+        if (realKey.equals(key)) {
+          String path = getPath(k);
+          Stat stat = zooKeeper.exists(path, false);
+          if (stat == null) {
+            return null;
+          }
+          byte[] data = zooKeeper.getData(path, false, stat);
+          if (data == null) {
+            return null;
+          }
+          String value = new String(data);
+          cache.put(key, value);
+          return value;
+        }
+      }
+      return null;
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private boolean putIfMissingFromZooKeeper(String key, String value) throws IOException {
+    try {
+      String path = getPath(key);
+      String newPath = zooKeeper.create(path + SEP, value.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+      String keyWithSeq = getKeyWithSeq(newPath);
+      List<String> keys = new ArrayList<String>(zooKeeper.getChildren(basePath, false));
+      Collections.sort(keys);
+      for (String k : keys) {
+        String realKey = getRealKey(k);
+        if (realKey.equals(key)) {
+          if (keyWithSeq.equals(k)) {
+            // got the lock
+            cache.put(key, value);
+            return true;
+          } else {
+            // remove duplicate key
+            zooKeeper.delete(newPath, -1);
+            return false;
+          }
+        }
+      }
+      return false;
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private String getKeyWithSeq(String newPath) {
+    int lastIndexOf = newPath.lastIndexOf('/');
+    if (lastIndexOf < 0) {
+      throw new RuntimeException("Path [" + newPath + "] does not contain [/]");
+    }
+    return newPath.substring(lastIndexOf + 1);
+  }
+
+  private String getRealKey(String keyWithSeq) {
+    int lastIndexOf = keyWithSeq.lastIndexOf(SEP);
+    if (lastIndexOf < 0) {
+      throw new RuntimeException("Key [" + keyWithSeq + "] does not contain [" + SEP + "]");
+    }
+    return keyWithSeq.substring(0, lastIndexOf);
+  }
+
+  private String getPath(String key) {
+    return basePath + "/" + key;
+  }
+
+}


Mime
View raw message