incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [45/51] [partial] Fixed BLUR-126.
Date Thu, 06 Jun 2013 18:58:17 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
new file mode 100644
index 0000000..609b7d0
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -0,0 +1,398 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_TIMETOLIVE;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DATA_FETCH_THREAD_COUNT;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.BlurQueryChecker;
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.manager.IndexServer;
+import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.server.ShardServerContext;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.thrift.generated.BlurResults;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.Schema;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.ShardState;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.QueryCache;
+import org.apache.blur.utils.QueryCacheEntry;
+import org.apache.blur.utils.QueryCacheKey;
+
+public class BlurShardServer extends TableAdmin implements Iface {
+
+  private static final Log LOG = LogFactory.getLog(BlurShardServer.class);
+  private IndexManager _indexManager;
+  private IndexServer _indexServer;
+  private boolean _closed;
+  private long _maxTimeToLive = TimeUnit.MINUTES.toMillis(1);
+  private int _maxQueryCacheElements = 128;
+  private QueryCache _queryCache;
+  private BlurQueryChecker _queryChecker;
+  private ExecutorService _dataFetch;
+  private String _cluster = BlurConstants.BLUR_CLUSTER;
+  private int _dataFetchThreadCount = 32;
+
+  public void init() throws BlurException {
+    _queryCache = new QueryCache("shard-cache", _maxQueryCacheElements, _maxTimeToLive);
+    _dataFetch = Executors.newThreadPool("data-fetch-", _dataFetchThreadCount);
+
+    if (_configuration == null) {
+      throw new BException("Configuration must be set before initialization.");
+    }
+    _cluster = _configuration.get(BlurConstants.BLUR_CLUSTER_NAME, BlurConstants.BLUR_CLUSTER);
+    _dataFetchThreadCount = _configuration.getInt(BLUR_SHARD_DATA_FETCH_THREAD_COUNT, 8);
+    _maxQueryCacheElements = _configuration.getInt(BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS, 128);
+    _maxTimeToLive = _configuration.getLong(BLUR_SHARD_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1));
+  }
+
+  @Override
+  public BlurResults query(String table, BlurQuery blurQuery) throws BlurException, TException {
+    checkTable(_cluster, table);
+    resetSearchers();
+    _queryChecker.checkQuery(blurQuery);
+    try {
+      BlurQuery original = new BlurQuery(blurQuery);
+      if (blurQuery.useCacheIfPresent) {
+        LOG.debug("Using cache for query [{0}] on table [{1}].", blurQuery, table);
+        QueryCacheKey key = QueryCache.getNormalizedBlurQueryKey(table, blurQuery);
+        QueryCacheEntry queryCacheEntry = _queryCache.get(key);
+        if (_queryCache.isValid(queryCacheEntry, _indexServer.getShardListCurrentServerOnly(table))) {
+          LOG.debug("Cache hit for query [{0}] on table [{1}].", blurQuery, table);
+          return queryCacheEntry.getBlurResults(blurQuery);
+        } else {
+          _queryCache.remove(key);
+        }
+      }
+      BlurUtil.setStartTime(original);
+      BlurResultIterable hitsIterable = null;
+      try {
+        AtomicLongArray facetCounts = BlurUtil.getAtomicLongArraySameLengthAsList(blurQuery.facets);
+        hitsIterable = _indexManager.query(table, blurQuery, facetCounts);
+        return _queryCache.cache(table, original,
+            BlurUtil.convertToHits(hitsIterable, blurQuery, facetCounts, _dataFetch, blurQuery.selector, this, table));
+      } catch (Exception e) {
+        LOG.error("Unknown error during search of [table={0},searchQuery={1}]", e, table, blurQuery);
+        throw new BException(e.getMessage(), e);
+      } finally {
+        if (hitsIterable != null) {
+          hitsIterable.close();
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Unknown error during search of [table={0},searchQuery={1}]", e, table, blurQuery);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public FetchResult fetchRow(String table, Selector selector) throws BlurException, TException {
+    checkTable(_cluster, table);
+    try {
+      FetchResult fetchResult = new FetchResult();
+      _indexManager.fetchRow(table, selector, fetchResult);
+      return fetchResult;
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get fetch row [table={0},selector={1}]", e, table, selector);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void cancelQuery(String table, long uuid) throws BlurException, TException {
+    checkTable(_cluster, table);
+    resetSearchers();
+    try {
+      _indexManager.cancelQuery(table, uuid);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to cancel search [uuid={0}]", e, uuid);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  private void resetSearchers() {
+    ShardServerContext.resetSearchers();
+  }
+
+  @Override
+  public List<BlurQueryStatus> currentQueries(String table) throws BlurException, TException {
+    checkTable(_cluster, table);
+    resetSearchers();
+    try {
+      return _indexManager.currentQueries(table);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get current search status [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public TableStats tableStats(String table) throws BlurException, TException {
+    checkTable(_cluster, table);
+    resetSearchers();
+    try {
+      TableStats tableStats = new TableStats();
+      tableStats.tableName = table;
+      tableStats.recordCount = _indexServer.getRecordCount(table);
+      tableStats.rowCount = _indexServer.getRowCount(table);
+      tableStats.bytes = _indexServer.getTableSize(table);
+      tableStats.queries = 0;
+      return tableStats;
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get table stats [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  public synchronized void close() {
+    if (!_closed) {
+      _closed = true;
+      _indexManager.close();
+      _dataFetch.shutdownNow();
+    }
+  }
+
+  @Override
+  public Map<String, String> shardServerLayout(String table) throws BlurException, TException {
+    checkTable(_cluster, table);
+    resetSearchers();
+    try {
+      Map<String, BlurIndex> blurIndexes = _indexServer.getIndexes(table);
+      Map<String, String> result = new TreeMap<String, String>();
+      String nodeName = _indexServer.getNodeName();
+      for (String shard : blurIndexes.keySet()) {
+        result.put(shard, nodeName);
+      }
+      return result;
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to getting shardServerLayout for table [" + table + "]", e);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public Map<String, Map<String, ShardState>> shardServerLayoutState(String table) throws BlurException, TException {
+    resetSearchers();
+    try {
+      Map<String, Map<String, ShardState>> result = new TreeMap<String, Map<String, ShardState>>();
+      String nodeName = _indexServer.getNodeName();
+      Map<String, ShardState> stateMap = _indexServer.getShardState(table);
+      for (Entry<String, ShardState> entry : stateMap.entrySet()) {
+        result.put(entry.getKey(), newMap(nodeName, entry.getValue()));
+      }
+      return result;
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to getting shardServerLayoutState for table [" + table + "]", e);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  private Map<String, ShardState> newMap(String nodeName, ShardState state) {
+    Map<String, ShardState> map = new HashMap<String, ShardState>();
+    map.put(nodeName, state);
+    return map;
+  }
+
+  @Override
+  public long recordFrequency(String table, String columnFamily, String columnName, String value) throws BlurException,
+      TException {
+    checkTable(_cluster, table);
+    resetSearchers();
+    try {
+      return _indexManager.recordFrequency(table, columnFamily, columnName, value);
+    } catch (Exception e) {
+      LOG.error(
+          "Unknown error while trying to get record frequency for [table={0},columnFamily={1},columnName={2},value={3}]",
+          e, table, columnFamily, columnName, value);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public Schema schema(String table) throws BlurException, TException {
+    checkTable(_cluster, table);
+    resetSearchers();
+    try {
+      return _indexManager.schema(table);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get schema for table [{0}={1}]", e, "table", table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public List<String> terms(String table, String columnFamily, String columnName, String startWith, short size)
+      throws BlurException, TException {
+    checkTable(_cluster, table);
+    resetSearchers();
+    try {
+      return _indexManager.terms(table, columnFamily, columnName, startWith, size);
+    } catch (Exception e) {
+      LOG.error(
+          "Unknown error while trying to get terms list for [table={0},columnFamily={1},columnName={2},startWith={3},size={4}]",
+          e, table, columnFamily, columnName, startWith, size);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void mutate(RowMutation mutation) throws BlurException, TException {
+    checkTable(_cluster, mutation.table);
+    checkForUpdates(_cluster, mutation.table);
+    resetSearchers();
+    MutationHelper.validateMutation(mutation);
+    try {
+      _indexManager.mutate(mutation);
+    } catch (Exception e) {
+      LOG.error("Unknown error during processing of [mutation={0}]", e, mutation);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void mutateBatch(List<RowMutation> mutations) throws BlurException, TException {
+    resetSearchers();
+    long s = System.nanoTime();
+    for (RowMutation mutation : mutations) {
+      checkTable(_cluster, mutation.table);
+      checkForUpdates(_cluster, mutation.table);
+      MutationHelper.validateMutation(mutation);
+    }
+    try {
+      _indexManager.mutate(mutations);
+    } catch (Exception e) {
+      LOG.error("Unknown error during processing of [mutations={0}]", e, mutations);
+      throw new BException(e.getMessage(), e);
+    }
+    long e = System.nanoTime();
+    LOG.debug("mutateBatch took [" + (e - s) / 1000000.0 + " ms] to complete");
+  }
+
+  public long getMaxTimeToLive() {
+    return _maxTimeToLive;
+  }
+
+  public void setMaxTimeToLive(long maxTimeToLive) {
+    _maxTimeToLive = maxTimeToLive;
+  }
+
+  public int getMaxQueryCacheElements() {
+    return _maxQueryCacheElements;
+  }
+
+  public void setMaxQueryCacheElements(int maxQueryCacheElements) {
+    _maxQueryCacheElements = maxQueryCacheElements;
+  }
+
+  public void setQueryChecker(BlurQueryChecker queryChecker) {
+    _queryChecker = queryChecker;
+  }
+
+  public void setIndexManager(IndexManager indexManager) {
+    _indexManager = indexManager;
+  }
+
+  public void setIndexServer(IndexServer indexServer) {
+    _indexServer = indexServer;
+  }
+
+  @Override
+  public BlurQueryStatus queryStatusById(String table, long uuid) throws BlurException, TException {
+    checkTable(_cluster, table);
+    resetSearchers();
+    BlurQueryStatus blurQueryStatus;
+    try {
+      blurQueryStatus = _indexManager.queryStatus(table, uuid);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get current query status [table={0},uuid={1}]", e, table, uuid);
+      throw new BException(e.getMessage(), e);
+    }
+    if (blurQueryStatus == null) {
+      throw new BlurException("Query status for table [" + table + "] and uuid [" + uuid + "] not found", null);
+    }
+    return blurQueryStatus;
+  }
+
+  @Override
+  public List<Long> queryStatusIdList(String table) throws BlurException, TException {
+    checkTable(_cluster, table);
+    resetSearchers();
+    try {
+      return _indexManager.queryStatusIdList(table);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get query status id list [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void optimize(String table, int numberOfSegmentsPerShard) throws BlurException, TException {
+    checkTable(_cluster, table);
+    resetSearchers();
+    try {
+      _indexManager.optimize(table, numberOfSegmentsPerShard);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to optimize [table={0},numberOfSegmentsPerShard={1}]", e, table,
+          numberOfSegmentsPerShard);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  public int getDataFetchThreadCount() {
+    return _dataFetchThreadCount;
+  }
+
+  public void setDataFetchThreadCount(int dataFetchThreadCount) {
+    _dataFetchThreadCount = dataFetchThreadCount;
+  }
+
+  public void setConfiguration(BlurConfiguration conf) {
+    _configuration = conf;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/thrift/MutationHelper.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/MutationHelper.java b/blur-core/src/main/java/org/apache/blur/thrift/MutationHelper.java
new file mode 100644
index 0000000..3ca394e
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/thrift/MutationHelper.java
@@ -0,0 +1,62 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.List;
+
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+
+public class MutationHelper {
+
+  public static String getShardName(String table, String rowId, int numberOfShards, BlurPartitioner blurPartitioner) {
+    int partition = blurPartitioner.getShard(rowId, numberOfShards);
+    return BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, partition);
+  }
+
+  public static void validateMutation(RowMutation mutation) {
+    if (mutation == null) {
+      throw new NullPointerException("Mutation can not be null.");
+    }
+    if (mutation.rowId == null) {
+      throw new NullPointerException("Rowid can not be null in mutation.");
+    }
+    if (mutation.table == null) {
+      throw new NullPointerException("Table can not be null in mutation.");
+    }
+  }
+
+  public static Row getRowFromMutations(String id, List<RecordMutation> recordMutations) {
+    Row row = new Row().setId(id);
+    for (RecordMutation mutation : recordMutations) {
+      Record record = mutation.getRecord();
+      switch (mutation.recordMutationType) {
+      case REPLACE_ENTIRE_RECORD:
+        row.addToRecords(record);
+        break;
+      default:
+        throw new RuntimeException("Not supported [" + mutation.recordMutationType + "]");
+      }
+    }
+    return row;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java b/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
new file mode 100644
index 0000000..8069764
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
@@ -0,0 +1,389 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.clusterstatus.ClusterStatus;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.ShardState;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.zookeeper.ZooKeeper;
+
+public abstract class TableAdmin implements Iface {
+
+  private static final Log LOG = LogFactory.getLog(TableAdmin.class);
+  protected ZooKeeper _zookeeper;
+  protected ClusterStatus _clusterStatus;
+  protected BlurConfiguration _configuration;
+
+  @Override
+  public TableStats getTableStats(String table) throws BlurException, TException {
+    return tableStats(table);
+  }
+
+  @Override
+  public boolean isInSafeMode(String cluster) throws BlurException, TException {
+    try {
+      return _clusterStatus.isInSafeMode(true, cluster);
+    } catch (Exception e) {
+      LOG.error("Unknown error during safe mode check of [cluster={0}]", e, cluster);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public final void createTable(TableDescriptor tableDescriptor) throws BlurException, TException {
+    try {
+      TableContext.clear();
+      BlurUtil.validateTableName(tableDescriptor.name);
+      // @todo Remove this once issue #27 is resolved
+      if (tableDescriptor.compressionBlockSize > 32768) {
+        tableDescriptor.compressionBlockSize = 32768;
+      } else if (tableDescriptor.compressionBlockSize < 8192) {
+        tableDescriptor.compressionBlockSize = 8192;
+      }
+      _clusterStatus.createTable(tableDescriptor);
+    } catch (Exception e) {
+      LOG.error("Unknown error during create of [table={0}, tableDescriptor={1}]", e, tableDescriptor.name,
+          tableDescriptor);
+      throw new BException(e.getMessage(), e);
+    }
+    if (tableDescriptor.isEnabled) {
+      enableTable(tableDescriptor.name);
+    }
+  }
+
+  @Override
+  public final void disableTable(String table) throws BlurException, TException {
+    try {
+      TableContext.clear();
+      String cluster = _clusterStatus.getCluster(false, table);
+      if (cluster == null) {
+        throw new BlurException("Table [" + table + "] not found.", null);
+      }
+      _clusterStatus.disableTable(cluster, table);
+      waitForTheTableToDisable(cluster, table);
+      waitForTheTableToDisengage(cluster, table);
+    } catch (Exception e) {
+      LOG.error("Unknown error during disable of [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public final void enableTable(String table) throws BlurException, TException {
+    try {
+      TableContext.clear();
+      String cluster = _clusterStatus.getCluster(false, table);
+      if (cluster == null) {
+        throw new BlurException("Table [" + table + "] not found.", null);
+      }
+      _clusterStatus.enableTable(cluster, table);
+      waitForTheTableToEnable(cluster, table);
+      waitForTheTableToEngage(cluster, table);
+    } catch (Exception e) {
+      LOG.error("Unknown error during enable of [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  private void waitForTheTableToEnable(String cluster, String table) throws BlurException {
+    LOG.info("Waiting for shards to engage on table [" + table + "]");
+    while (true) {
+      if (_clusterStatus.isEnabled(false, cluster, table)) {
+        return;
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        LOG.error("Unknown error while enabling table [" + table + "]", e);
+        throw new BException("Unknown error while enabling table [" + table + "]", e);
+      }
+    }
+  }
+
+  /**
+   * This method only works on controllers, if called on shard servers it will
+   * only wait itself to finish not the whole cluster.
+   */
+  private void waitForTheTableToEngage(String cluster, String table) throws BlurException, TException {
+    TableDescriptor describe = describe(table);
+    int shardCount = describe.shardCount;
+    LOG.info("Waiting for shards to engage on table [" + table + "]");
+    while (true) {
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        LOG.error("Unknown error while engaging table [" + table + "]", e);
+        throw new BException("Unknown error while engaging table [" + table + "]", e);
+      }
+      try {
+        Map<String, Map<String, ShardState>> shardServerLayoutState = shardServerLayoutState(table);
+
+        int countNumberOfOpen = 0;
+        int countNumberOfOpening = 0;
+        for (Entry<String, Map<String, ShardState>> shardEntry : shardServerLayoutState.entrySet()) {
+          Map<String, ShardState> value = shardEntry.getValue();
+          for (ShardState state : value.values()) {
+            if (state == ShardState.OPEN) {
+              countNumberOfOpen++;
+            } else if (state == ShardState.OPENING) {
+              countNumberOfOpening++;
+            } else {
+              LOG.warn("Unexpected state of [{0}] for shard [{1}].", state, shardEntry.getKey());
+            }
+          }
+        }
+        LOG.info("Opening - Shards Open [{0}], Shards Opening [{1}] of table [{2}]", countNumberOfOpen,
+            countNumberOfOpening, table);
+        if (countNumberOfOpen == shardCount && countNumberOfOpening == 0) {
+          return;
+        }
+      } catch (BlurException e) {
+        LOG.info("Stilling waiting", e);
+      } catch (TException e) {
+        LOG.info("Stilling waiting", e);
+      }
+    }
+  }
+
+  /**
+   * This method only works on controllers, if called on shard servers it will
+   * only wait itself to finish not the whole cluster.
+   */
+  private void waitForTheTableToDisengage(String cluster, String table) throws BlurException, TException {
+    LOG.info("Waiting for shards to disengage on table [" + table + "]");
+    while (true) {
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        LOG.error("Unknown error while disengaging table [" + table + "]", e);
+        throw new BException("Unknown error while disengaging table [" + table + "]", e);
+      }
+      try {
+        Map<String, Map<String, ShardState>> shardServerLayoutState = shardServerLayoutState(table);
+
+        int countNumberOfOpen = 0;
+        int countNumberOfClosing = 0;
+        for (Entry<String, Map<String, ShardState>> shardEntry : shardServerLayoutState.entrySet()) {
+          Map<String, ShardState> value = shardEntry.getValue();
+          for (ShardState state : value.values()) {
+            if (state == ShardState.OPEN) {
+              countNumberOfOpen++;
+            } else if (state == ShardState.CLOSING) {
+              countNumberOfClosing++;
+            } else if (state == ShardState.CLOSED) {
+              LOG.info("Shard [{0}] of table [{1}] now reporting closed.", shardEntry.getKey(), table);
+            } else {
+              LOG.warn("Unexpected state of [{0}] for shard [{1}].", state, shardEntry.getKey());
+            }
+          }
+        }
+        LOG.info("Closing - Shards Open [{0}], Shards Closing [{1}] of table [{2}]", countNumberOfOpen,
+            countNumberOfClosing, table);
+        if (countNumberOfOpen == 0 && countNumberOfClosing == 0) {
+          return;
+        }
+      } catch (BlurException e) {
+        LOG.info("Stilling waiting", e);
+      } catch (TException e) {
+        LOG.info("Stilling waiting", e);
+      }
+    }
+  }
+
+  private void waitForTheTableToDisable(String cluster, String table) throws BlurException, TException {
+    LOG.info("Waiting for shards to disable on table [" + table + "]");
+    while (true) {
+      if (!_clusterStatus.isEnabled(false, cluster, table)) {
+        return;
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        LOG.error("Unknown error while enabling table [" + table + "]", e);
+        throw new BException("Unknown error while enabling table [" + table + "]", e);
+      }
+    }
+  }
+
+  @Override
+  public final void removeTable(String table, boolean deleteIndexFiles) throws BlurException, TException {
+    try {
+      TableContext.clear();
+      String cluster = _clusterStatus.getCluster(false, table);
+      if (cluster == null) {
+        throw new BlurException("Table [" + table + "] not found.", null);
+      }
+      _clusterStatus.removeTable(cluster, table, deleteIndexFiles);
+    } catch (Exception e) {
+      LOG.error("Unknown error during remove of [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  public boolean isTableEnabled(boolean useCache, String cluster, String table) {
+    return _clusterStatus.isEnabled(useCache, cluster, table);
+  }
+
+  public void checkTable(String table) throws BlurException {
+    if (table == null) {
+      throw new BlurException("Table cannot be null.", null);
+    }
+    String cluster = _clusterStatus.getCluster(true, table);
+    if (cluster == null) {
+      throw new BlurException("Table [" + table + "] does not exist", null);
+    }
+    checkTable(cluster, table);
+  }
+
+  public void checkTable(String cluster, String table) throws BlurException {
+    if (inSafeMode(true, table)) {
+      throw new BlurException("Cluster for [" + table + "] is in safe mode", null);
+    }
+    if (tableExists(true, cluster, table)) {
+      if (isTableEnabled(true, cluster, table)) {
+        return;
+      }
+      throw new BlurException("Table [" + table + "] exists, but is not enabled", null);
+    } else {
+      throw new BlurException("Table [" + table + "] does not exist", null);
+    }
+  }
+
+  public void checkForUpdates(String table) throws BlurException {
+    String cluster = _clusterStatus.getCluster(true, table);
+    if (cluster == null) {
+      throw new BlurException("Table [" + table + "] does not exist", null);
+    }
+    checkForUpdates(cluster, table);
+  }
+
+  public void checkForUpdates(String cluster, String table) throws BlurException {
+    if (_clusterStatus.isReadOnly(true, cluster, table)) {
+      throw new BlurException("Table [" + table + "] in cluster [" + cluster + "] is read only.", null);
+    }
+  }
+
+  @Override
+  public final List<String> controllerServerList() throws BlurException, TException {
+    try {
+      return _clusterStatus.getControllerServerList();
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get a controller list.", e);
+      throw new BException("Unknown error while trying to get a controller list.", e);
+    }
+  }
+
+  @Override
+  public final List<String> shardServerList(String cluster) throws BlurException, TException {
+    try {
+      return _clusterStatus.getShardServerList(cluster);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get a shard server list.", e);
+      throw new BException("Unknown error while trying to get a shard server list.", e);
+    }
+  }
+
+  @Override
+  public final List<String> shardClusterList() throws BlurException, TException {
+    try {
+      return _clusterStatus.getClusterList(true);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get a cluster list.", e);
+      throw new BException("Unknown error while trying to get a cluster list.", e);
+    }
+  }
+
+  @Override
+  public final TableDescriptor describe(final String table) throws BlurException, TException {
+    try {
+      String cluster = _clusterStatus.getCluster(true, table);
+      if (cluster == null) {
+        throw new BlurException("Table [" + table + "] not found.", null);
+      }
+      return _clusterStatus.getTableDescriptor(true, cluster, table);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to describe a table [" + table + "].", e);
+      throw new BException("Unknown error while trying to describe a table [" + table + "].", e);
+    }
+  }
+
+  @Override
+  public final List<String> tableListByCluster(String cluster) throws BlurException, TException {
+    try {
+      return _clusterStatus.getTableList(true, cluster);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get a table list by cluster [" + cluster + "].", e);
+      throw new BException("Unknown error while trying to get a table list by cluster [" + cluster + "].", e);
+    }
+  }
+
+  @Override
+  public final List<String> tableList() throws BlurException, TException {
+    try {
+      return _clusterStatus.getTableList(true);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get a table list.", e);
+      throw new BException("Unknown error while trying to get a table list.", e);
+    }
+  }
+
+  private boolean inSafeMode(boolean useCache, String table) throws BlurException {
+    String cluster = _clusterStatus.getCluster(useCache, table);
+    if (cluster == null) {
+      throw new BlurException("Table [" + table + "] not found.", null);
+    }
+    return _clusterStatus.isInSafeMode(useCache, cluster);
+  }
+
+  public boolean tableExists(boolean useCache, String cluster, String table) {
+    return _clusterStatus.exists(useCache, cluster, table);
+  }
+
+  public ClusterStatus getClusterStatus() {
+    return _clusterStatus;
+  }
+
+  public void setClusterStatus(ClusterStatus clusterStatus) {
+    _clusterStatus = clusterStatus;
+  }
+
+  public void setZookeeper(ZooKeeper zookeeper) {
+    _zookeeper = zookeeper;
+  }
+
+  public void setConfiguration(BlurConfiguration config) {
+    _configuration = config;
+  }
+
+  @Override
+  public Map<String, String> configuration() throws BlurException, TException {
+    return _configuration.getProperties();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
new file mode 100644
index 0000000..ee22f3e
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -0,0 +1,154 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_ADDRESS;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_CACHE_MAX_QUERYCACHE_ELEMENTS;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_CACHE_MAX_TIMETOLIVE;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_HOSTNAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_REMOTE_FETCH_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_DEFAULT_DELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_FETCH_DELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MAX_DEFAULT_DELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MAX_DEFAULT_RETRIES;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MAX_FETCH_DELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MAX_FETCH_RETRIES;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MAX_MUTATE_DELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MAX_MUTATE_RETRIES;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MUTATE_DELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_SERVER_REMOTE_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
+import static org.apache.blur.utils.BlurUtil.quietClose;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.concurrent.SimpleUncaughtExceptionHandler;
+import org.apache.blur.concurrent.ThreadWatcher;
+import org.apache.blur.gui.HttpJettyServer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.BlurQueryChecker;
+import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.manager.indexserver.BlurServerShutDown;
+import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.zookeeper.ZkUtils;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class ThriftBlurControllerServer extends ThriftServer {
+
+  private static final Log LOG = LogFactory.getLog(ThriftBlurControllerServer.class);
+
+  public static void main(String[] args) throws Exception {
+    int serverIndex = getServerIndex(args);
+    LOG.info("Setting up Controller Server");
+    BlurConfiguration configuration = new BlurConfiguration();
+    printUlimits();
+    ThriftServer server = createServer(serverIndex, configuration);
+    server.start();
+  }
+
+  public static ThriftServer createServer(int serverIndex, BlurConfiguration configuration) throws Exception {
+    Thread.setDefaultUncaughtExceptionHandler(new SimpleUncaughtExceptionHandler());
+    String bindAddress = configuration.get(BLUR_CONTROLLER_BIND_ADDRESS);
+    int bindPort = configuration.getInt(BLUR_CONTROLLER_BIND_PORT, -1);
+    bindPort += serverIndex;
+
+    LOG.info("Shard Server using index [{0}] bind address [{1}]", serverIndex, bindAddress + ":" + bindPort);
+
+    String nodeName = ThriftBlurShardServer.getNodeName(configuration, BLUR_CONTROLLER_HOSTNAME);
+    nodeName = nodeName + ":" + bindPort;
+    String zkConnectionStr = isEmpty(configuration.get(BLUR_ZOOKEEPER_CONNECTION), BLUR_ZOOKEEPER_CONNECTION);
+
+    BlurQueryChecker queryChecker = new BlurQueryChecker(configuration);
+
+    final ZooKeeper zooKeeper = ZkUtils.newZooKeeper(zkConnectionStr);
+
+    BlurUtil.setupZookeeper(zooKeeper);
+
+    final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(zooKeeper);
+
+    BlurControllerServer.BlurClient client = new BlurControllerServer.BlurClientRemote();
+
+    final BlurControllerServer controllerServer = new BlurControllerServer();
+    controllerServer.setClient(client);
+    controllerServer.setClusterStatus(clusterStatus);
+    controllerServer.setZookeeper(zooKeeper);
+    controllerServer.setNodeName(nodeName);
+    controllerServer.setRemoteFetchCount(configuration.getInt(BLUR_CONTROLLER_REMOTE_FETCH_COUNT, 100));
+    controllerServer.setMaxQueryCacheElements(configuration.getInt(BLUR_CONTROLLER_CACHE_MAX_QUERYCACHE_ELEMENTS, 128));
+    controllerServer.setMaxTimeToLive(configuration.getLong(BLUR_CONTROLLER_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1)));
+    controllerServer.setQueryChecker(queryChecker);
+    controllerServer.setThreadCount(configuration.getInt(BLUR_CONTROLLER_SERVER_REMOTE_THREAD_COUNT, 64));
+    controllerServer.setMaxFetchRetries(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_FETCH_RETRIES, 3));
+    controllerServer.setMaxMutateRetries(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_MUTATE_RETRIES, 3));
+    controllerServer.setMaxDefaultRetries(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_DEFAULT_RETRIES, 3));
+    controllerServer.setFetchDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_FETCH_DELAY, 500));
+    controllerServer.setMutateDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MUTATE_DELAY, 500));
+    controllerServer.setDefaultDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_DEFAULT_DELAY, 500));
+    controllerServer.setMaxFetchDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_FETCH_DELAY, 2000));
+    controllerServer.setMaxMutateDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_MUTATE_DELAY, 2000));
+    controllerServer.setMaxDefaultDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_DEFAULT_DELAY, 2000));
+
+    controllerServer.init();
+
+    Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(controllerServer, Iface.class);
+
+    int threadCount = configuration.getInt(BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT, 32);
+
+    final ThriftBlurControllerServer server = new ThriftBlurControllerServer();
+    server.setNodeName(nodeName);
+    server.setConfiguration(configuration);
+    server.setBindAddress(bindAddress);
+    server.setBindPort(bindPort);
+    server.setThreadCount(threadCount);
+    server.setIface(iface);
+
+    int baseGuiPort = Integer.parseInt(configuration.get(BLUR_GUI_CONTROLLER_PORT));
+    final HttpJettyServer httpServer;
+    if (baseGuiPort > 0) {
+      int webServerPort = baseGuiPort + serverIndex;
+      // TODO: this got ugly, there has to be a better way to handle all these
+      // params
+      // without reversing the mvn dependancy and making blur-gui on top.
+      httpServer = new HttpJettyServer(bindPort, webServerPort, configuration.getInt(BLUR_CONTROLLER_BIND_PORT, -1), configuration.getInt(BLUR_SHARD_BIND_PORT, -1),
+          configuration.getInt(BLUR_GUI_CONTROLLER_PORT, -1), configuration.getInt(BLUR_GUI_SHARD_PORT, -1), "controller");
+    } else {
+      httpServer = null;
+    }
+
+    // This will shutdown the server when the correct path is set in zk
+    BlurShutdown shutdown = new BlurShutdown() {
+      @Override
+      public void shutdown() {
+        ThreadWatcher threadWatcher = ThreadWatcher.instance();
+        quietClose(server, controllerServer, clusterStatus, zooKeeper, threadWatcher, httpServer);
+      }
+    };
+    server.setShutdown(shutdown);
+    new BlurServerShutDown().register(shutdown, zooKeeper);
+    return server;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
new file mode 100644
index 0000000..c0e9706
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -0,0 +1,297 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER;
+import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER_NAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_MAX_CLAUSE_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_ADDRESS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_SLAB_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_FILTER_CACHE_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_HOSTNAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_WARMUP_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_OPENER_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SAFEMODEDELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
+import static org.apache.blur.utils.BlurUtil.quietClose;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.concurrent.SimpleUncaughtExceptionHandler;
+import org.apache.blur.concurrent.ThreadWatcher;
+import org.apache.blur.gui.HttpJettyServer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.BlurFilterCache;
+import org.apache.blur.manager.BlurQueryChecker;
+import org.apache.blur.manager.DefaultBlurFilterCache;
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.manager.indexserver.BlurIndexWarmup;
+import org.apache.blur.manager.indexserver.BlurServerShutDown;
+import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
+import org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup;
+import org.apache.blur.manager.indexserver.DistributedIndexServer;
+import org.apache.blur.manager.writer.BlurIndexRefresher;
+import org.apache.blur.metrics.JSONReporter;
+import org.apache.blur.metrics.JSONReporterServlet;
+import org.apache.blur.server.ShardServerEventHandler;
+import org.apache.blur.store.blockcache.BlockCache;
+import org.apache.blur.store.blockcache.BlockDirectory;
+import org.apache.blur.store.blockcache.BlockDirectoryCache;
+import org.apache.blur.store.blockcache.Cache;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TJSONProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.server.TServlet;
+import org.apache.blur.thrift.generated.Blur;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.zookeeper.ZkUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.ZooKeeper;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.jetty.webapp.WebAppContext;
+
+public class ThriftBlurShardServer extends ThriftServer {
+
+  private static final Log LOG = LogFactory.getLog(ThriftBlurShardServer.class);
+  private static final boolean enableJsonReporter = false;
+
+  public static void main(String[] args) throws Exception {
+    int serverIndex = getServerIndex(args);
+    LOG.info("Setting up Shard Server");
+    Thread.setDefaultUncaughtExceptionHandler(new SimpleUncaughtExceptionHandler());
+    BlurConfiguration configuration = new BlurConfiguration();
+    printUlimits();
+    ThriftServer server = createServer(serverIndex, configuration);
+    server.start();
+  }
+
+  public static ThriftServer createServer(int serverIndex, BlurConfiguration configuration) throws Exception {
+    // setup block cache
+    // 134,217,728 is the slab size, therefore there are 16,384 blocks
+    // in a slab when using a block size of 8,192
+    int numberOfBlocksPerSlab = 16384;
+    int blockSize = BlockDirectory.BLOCK_SIZE;
+    int slabCount = configuration.getInt(BLUR_SHARD_BLOCKCACHE_SLAB_COUNT, -1);
+    slabCount = getSlabCount(slabCount, numberOfBlocksPerSlab, blockSize);
+    Cache cache;
+    Configuration config = new Configuration();
+
+    String bindAddress = configuration.get(BLUR_SHARD_BIND_ADDRESS);
+    int bindPort = configuration.getInt(BLUR_SHARD_BIND_PORT, -1);
+    bindPort += serverIndex;
+
+    int baseGuiPort = Integer.parseInt(configuration.get(BLUR_GUI_SHARD_PORT));
+    final HttpJettyServer httpServer;
+    if (baseGuiPort > 0) {
+      int webServerPort = baseGuiPort + serverIndex;
+
+      // TODO: this got ugly, there has to be a better way to handle all these
+      // params
+      // without reversing the mvn dependancy and making blur-gui on top.
+      httpServer = new HttpJettyServer(bindPort, webServerPort, configuration.getInt(BLUR_CONTROLLER_BIND_PORT, -1),
+          configuration.getInt(BLUR_SHARD_BIND_PORT, -1), configuration.getInt(BLUR_GUI_CONTROLLER_PORT, -1),
+          configuration.getInt(BLUR_GUI_SHARD_PORT, -1), "shard");
+    } else {
+      httpServer = null;
+    }
+
+    if (slabCount >= 1) {
+      BlockCache blockCache;
+      boolean directAllocation = configuration.getBoolean(BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION, true);
+
+      int slabSize = numberOfBlocksPerSlab * blockSize;
+      LOG.info("Number of slabs of block cache [{0}] with direct memory allocation set to [{1}]", slabCount,
+          directAllocation);
+      LOG.info("Block cache target memory usage, slab size of [{0}] will allocate [{1}] slabs and use ~[{2}] bytes",
+          slabSize, slabCount, ((long) slabCount * (long) slabSize));
+
+      int _1024Size = configuration.getInt("blur.shard.buffercache.1024", 8192);
+      int _8192Size = configuration.getInt("blur.shard.buffercache.8192", 8192);
+      BufferStore.init(_1024Size, _8192Size);
+
+      try {
+        long totalMemory = (long) slabCount * (long) numberOfBlocksPerSlab * (long) blockSize;
+        blockCache = new BlockCache(directAllocation, totalMemory, slabSize);
+      } catch (OutOfMemoryError e) {
+        if ("Direct buffer memory".equals(e.getMessage())) {
+          System.err
+              .println("The max direct memory is too low.  Either increase by setting (-XX:MaxDirectMemorySize=<size>g -XX:+UseLargePages) or disable direct allocation by (blur.shard.blockcache.direct.memory.allocation=false) in blur-site.properties");
+          System.exit(1);
+        }
+        throw e;
+      }
+      cache = new BlockDirectoryCache(blockCache);
+    } else {
+      cache = BlockDirectory.NO_CACHE;
+    }
+
+    LOG.info("Shard Server using index [{0}] bind address [{1}]", serverIndex, bindAddress + ":" + bindPort);
+
+    String nodeNameHostName = getNodeName(configuration, BLUR_SHARD_HOSTNAME);
+    String nodeName = nodeNameHostName + ":" + bindPort;
+    String zkConnectionStr = isEmpty(configuration.get(BLUR_ZOOKEEPER_CONNECTION), BLUR_ZOOKEEPER_CONNECTION);
+
+    BlurQueryChecker queryChecker = new BlurQueryChecker(configuration);
+
+    final ZooKeeper zooKeeper = ZkUtils.newZooKeeper(zkConnectionStr);
+
+    BlurUtil.setupZookeeper(zooKeeper, configuration.get(BLUR_CLUSTER_NAME));
+
+    final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(zooKeeper);
+
+    final BlurIndexRefresher refresher = new BlurIndexRefresher();
+    refresher.init();
+
+    BlurFilterCache filterCache = getFilterCache(configuration);
+    BlurIndexWarmup indexWarmup = getIndexWarmup(configuration);
+
+    final DistributedIndexServer indexServer = new DistributedIndexServer();
+    indexServer.setCache(cache);
+    indexServer.setClusterStatus(clusterStatus);
+    indexServer.setClusterName(configuration.get(BLUR_CLUSTER_NAME, BLUR_CLUSTER));
+    indexServer.setConfiguration(config);
+    indexServer.setNodeName(nodeName);
+    indexServer.setShardOpenerThreadCount(configuration.getInt(BLUR_SHARD_OPENER_THREAD_COUNT, 16));
+    indexServer.setZookeeper(zooKeeper);
+    indexServer.setFilterCache(filterCache);
+    indexServer.setSafeModeDelay(configuration.getLong(BLUR_SHARD_SAFEMODEDELAY, 60000));
+    indexServer.setWarmup(indexWarmup);
+    indexServer.init();
+
+    final IndexManager indexManager = new IndexManager();
+    indexManager.setIndexServer(indexServer);
+    indexManager.setMaxClauseCount(configuration.getInt(BLUR_MAX_CLAUSE_COUNT, 1024));
+    indexManager.setThreadCount(configuration.getInt(BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT, 32));
+    indexManager.setFilterCache(filterCache);
+    indexManager.setClusterStatus(clusterStatus);
+    indexManager.init();
+
+    final BlurShardServer shardServer = new BlurShardServer();
+    shardServer.setIndexServer(indexServer);
+    shardServer.setIndexManager(indexManager);
+    shardServer.setZookeeper(zooKeeper);
+    shardServer.setClusterStatus(clusterStatus);
+    shardServer.setQueryChecker(queryChecker);
+    shardServer.setConfiguration(configuration);
+    shardServer.init();
+
+    Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(shardServer, Iface.class);
+    if (httpServer != null) {
+      WebAppContext context = httpServer.getContext();
+      context.addServlet(new ServletHolder(new TServlet(new Blur.Processor<Blur.Iface>(iface),
+          new TJSONProtocol.Factory())), "/blur");
+      context.addServlet(new ServletHolder(new JSONReporterServlet()), "/livemetrics");
+      if (enableJsonReporter) {
+        JSONReporter.enable("json-reporter", 1, TimeUnit.SECONDS, 60);
+      }
+    }
+
+    int threadCount = configuration.getInt(BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT, 32);
+
+    ShardServerEventHandler eventHandler = new ShardServerEventHandler();
+    
+    final ThriftBlurShardServer server = new ThriftBlurShardServer();
+    server.setNodeName(nodeName);
+    server.setBindAddress(bindAddress);
+    server.setBindPort(bindPort);
+    server.setThreadCount(threadCount);
+    server.setIface(iface);
+    server.setConfiguration(configuration);
+    server.setEventHandler(eventHandler);
+
+    // This will shutdown the server when the correct path is set in zk
+    BlurShutdown shutdown = new BlurShutdown() {
+      @Override
+      public void shutdown() {
+        ThreadWatcher threadWatcher = ThreadWatcher.instance();
+        quietClose(refresher, server, shardServer, indexManager, indexServer, threadWatcher, clusterStatus, zooKeeper,
+            httpServer);
+      }
+    };
+    server.setShutdown(shutdown);
+    new BlurServerShutDown().register(shutdown, zooKeeper);
+    return server;
+  }
+
+  private static int getSlabCount(int slabCount, int numberOfBlocksPerSlab, int blockSize) {
+    if (slabCount < 0) {
+      long slabSize = numberOfBlocksPerSlab * blockSize;
+      List<String> inputArguments = ManagementFactory.getRuntimeMXBean().getInputArguments();
+      for (String arg : inputArguments) {
+        if (arg.startsWith("-XX:MaxDirectMemorySize")) {
+          long maxDirectMemorySize = getMaxDirectMemorySize(arg);
+          maxDirectMemorySize -= 64 * 1024 * 1024;
+          return (int) (maxDirectMemorySize / slabSize);
+        }
+      }
+      throw new RuntimeException("Auto slab setup cannot happen, JVM option -XX:MaxDirectMemorySize not set.");
+    }
+    return slabCount;
+  }
+
+  private static long getMaxDirectMemorySize(String arg) {
+    int index = arg.lastIndexOf('=');
+    return parseNumber(arg.substring(index + 1).toLowerCase().replace(" ", ""));
+  }
+
+  private static long parseNumber(String number) {
+    if (number.endsWith("m")) {
+      return Long.parseLong(number.substring(0, number.length() - 1)) * 1024 * 1024;
+    } else if (number.endsWith("g")) {
+      return Long.parseLong(number.substring(0, number.length() - 1)) * 1024 * 1024 * 1024;
+    }
+    throw new RuntimeException("Cannot parse [" + number + "]");
+  }
+
+  private static BlurFilterCache getFilterCache(BlurConfiguration configuration) {
+    String _blurFilterCacheClass = configuration.get(BLUR_SHARD_FILTER_CACHE_CLASS);
+    if (_blurFilterCacheClass != null) {
+      try {
+        Class<?> clazz = Class.forName(_blurFilterCacheClass);
+        return (BlurFilterCache) clazz.newInstance();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return new DefaultBlurFilterCache();
+  }
+
+  private static BlurIndexWarmup getIndexWarmup(BlurConfiguration configuration) {
+    String _blurFilterCacheClass = configuration.get(BLUR_SHARD_INDEX_WARMUP_CLASS);
+    if (_blurFilterCacheClass != null) {
+      try {
+        Class<?> clazz = Class.forName(_blurFilterCacheClass);
+        return (BlurIndexWarmup) clazz.newInstance();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return new DefaultBlurIndexWarmup();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
new file mode 100644
index 0000000..8bcad81
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
@@ -0,0 +1,207 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TBinaryProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.server.TServer;
+import org.apache.blur.thirdparty.thrift_0_9_0.server.TServerEventHandler;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TFramedTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerSocket;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.apache.blur.thrift.generated.Blur;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.server.TThreadedSelectorServer;
+
+public class ThriftServer {
+
+  private static final Log LOG = LogFactory.getLog(ThriftServer.class);
+
+  private String _nodeName;
+  private Iface _iface;
+  private TServer _server;
+  private boolean _closed;
+  private BlurConfiguration _configuration;
+  private int _threadCount;
+  private int _bindPort;
+  private String _bindAddress;
+  private BlurShutdown _shutdown;
+  private ExecutorService _executorService;
+  private ExecutorService _queryExexutorService;
+  private ExecutorService _mutateExecutorService;
+  private TServerEventHandler _eventHandler;
+
+  public static void printUlimits() throws IOException {
+    ProcessBuilder processBuilder = new ProcessBuilder("ulimit", "-a");
+    Process process;
+    try {
+      process = processBuilder.start();
+    } catch (Exception e) {
+      LOG.warn("Could not run ulimit command to retrieve limits.", e);
+      return;
+    }
+
+    InputStream inputStream = process.getInputStream();
+    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+    String line;
+    while ((line = reader.readLine()) != null) {
+      LOG.info("ulimit: " + line);
+    }
+    reader.close();
+  }
+
+  public synchronized void close() {
+    if (!_closed) {
+      _closed = true;
+      _shutdown.shutdown();
+      _server.stop();
+      if (_executorService != null) {
+        _executorService.shutdownNow();
+      }
+      if (_queryExexutorService != null) {
+        _queryExexutorService.shutdownNow();
+      }
+      if (_mutateExecutorService != null) {
+        _mutateExecutorService.shutdownNow();
+      }
+    }
+  }
+
+  protected static int getServerIndex(String[] args) {
+    for (int i = 0; i < args.length; i++) {
+      if ("-s".equals(args[i])) {
+        if (i + 1 < args.length) {
+          return Integer.parseInt(args[i + 1]);
+        }
+      }
+    }
+    return 0;
+  }
+
+  public void start() throws TTransportException {
+    _executorService = Executors.newThreadPool("thrift-processors", _threadCount);
+    Blur.Processor<Blur.Iface> processor = new Blur.Processor<Blur.Iface>(_iface);
+
+    TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(getBindInetSocketAddress(_configuration));
+    TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(serverTransport);
+    args.processor(processor);
+    args.executorService(_executorService);
+    args.transportFactory(new TFramedTransport.Factory());
+    args.protocolFactory(new TBinaryProtocol.Factory(true, true));
+    _server = new TThreadedSelectorServer(args);
+    _server.setServerEventHandler(_eventHandler);
+    LOG.info("Starting server [{0}]", _nodeName);
+    _server.serve();
+  }
+
+  public InetSocketAddress getBindInetSocketAddress(BlurConfiguration configuration) {
+    return new InetSocketAddress(_bindAddress, _bindPort);
+  }
+
+  public static String isEmpty(String str, String name) {
+    if (str == null || str.trim().isEmpty()) {
+      throw new IllegalArgumentException("Property [" + name + "] is missing or blank.");
+    }
+    return str;
+  }
+
+  public Iface getIface() {
+    return _iface;
+  }
+
+  public void setIface(Iface iface) {
+    this._iface = iface;
+  }
+
+  public String getNodeName() {
+    return _nodeName;
+  }
+
+  public void setNodeName(String nodeName) {
+    this._nodeName = nodeName;
+  }
+
+  public void setConfiguration(BlurConfiguration configuration) {
+    this._configuration = configuration;
+  }
+
+  public static String getNodeName(BlurConfiguration configuration, String hostNameProperty)
+      throws UnknownHostException {
+    String hostName = configuration.get(hostNameProperty);
+    if (hostName == null) {
+      hostName = "";
+    }
+    hostName = hostName.trim();
+    if (hostName.isEmpty()) {
+      try {
+        return InetAddress.getLocalHost().getHostName();
+      } catch (UnknownHostException e) {
+        String message = e.getMessage();
+        int index = message.indexOf(':');
+        if (index < 0) {
+          throw new RuntimeException("Nodename cannot be determined.");
+        }
+        String nodeName = message.substring(0, index);
+        LOG.warn("Hack to get nodename from exception [" + nodeName + "]");
+        return nodeName;
+      }
+    }
+    return hostName;
+  }
+
+  public void setBindPort(int bindPort) {
+    _bindPort = bindPort;
+  }
+
+  public void setBindAddress(String bindAddress) {
+    _bindAddress = bindAddress;
+  }
+
+  public void setThreadCount(int threadCount) {
+    this._threadCount = threadCount;
+  }
+
+  public BlurShutdown getShutdown() {
+    return _shutdown;
+  }
+
+  public void setShutdown(BlurShutdown shutdown) {
+    this._shutdown = shutdown;
+  }
+
+  public TServerEventHandler getEventHandler() {
+    return _eventHandler;
+  }
+
+  public void setEventHandler(TServerEventHandler eventHandler) {
+    _eventHandler = eventHandler;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
new file mode 100644
index 0000000..3f109b4
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -0,0 +1,117 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.manager.results.BlurResultComparator;
+import org.apache.blur.manager.results.BlurResultPeekableIteratorComparator;
+import org.apache.blur.manager.results.PeekableIterator;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.lucene.index.Term;
+
+
+public class BlurConstants {
+
+  public static final String CONTROLLER = "controller";
+  public static final String SHARD = "shard";
+  public static final String SHARD_PREFIX = "shard-";
+  public static final Comparator<? super PeekableIterator<BlurResult>> HITS_PEEKABLE_ITERATOR_COMPARATOR = new BlurResultPeekableIteratorComparator();
+  public static final Comparator<? super BlurResult> HITS_COMPARATOR = new BlurResultComparator();
+
+  public static final String PRIME_DOC = "_prime_";
+  public static final String PRIME_DOC_VALUE = "true";
+  public static final String ROW_ID = "rowid";
+  public static final String RECORD_ID = "recordid";
+  public static final String FAMILY = "family";
+  public static final String SUPER = "super";
+  public static final String SEP = ".";
+
+  public static final String BLUR_TABLE_PATH = "blur.table.path";
+  public static final String BLUR_ZOOKEEPER_CONNECTION = "blur.zookeeper.connection";
+  public static final String BLUR_SHARD_HOSTNAME = "blur.shard.hostname";
+  public static final String BLUR_SHARD_BIND_PORT = "blur.shard.bind.port";
+  public static final String BLUR_SHARD_BIND_ADDRESS = "blur.shard.bind.address";
+  public static final String BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION = "blur.shard.blockcache.direct.memory.allocation";
+  public static final String BLUR_SHARD_BLOCKCACHE_SLAB_COUNT = "blur.shard.blockcache.slab.count";
+  public static final String BLUR_SHARD_SAFEMODEDELAY = "blur.shard.safemodedelay";
+  public static final String BLUR_CONTROLLER_HOSTNAME = "blur.controller.hostname";
+  public static final String BLUR_CONTROLLER_BIND_PORT = "blur.controller.bind.port";
+  public static final String BLUR_CONTROLLER_BIND_ADDRESS = "blur.controller.bind.address";
+  public static final String BLUR_QUERY_MAX_ROW_FETCH = "blur.query.max.row.fetch";
+  public static final String BLUR_QUERY_MAX_RECORD_FETCH = "blur.query.max.record.fetch";
+  public static final String BLUR_QUERY_MAX_RESULTS_FETCH = "blur.query.max.results.fetch";
+
+  public static final String BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT = "blur.shard.server.thrift.thread.count";
+  public static final String BLUR_SHARD_CACHE_MAX_TIMETOLIVE = "blur.shard.cache.max.timetolive";
+  public static final String BLUR_SHARD_FILTER_CACHE_CLASS = "blur.shard.filter.cache.class";
+  public static final String BLUR_SHARD_INDEX_WARMUP_CLASS = "blur.shard.index.warmup.class";
+  public static final String BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT = "blur.indexmanager.search.thread.count";
+  public static final String BLUR_SHARD_DATA_FETCH_THREAD_COUNT = "blur.shard.data.fetch.thread.count";
+  public static final String BLUR_MAX_CLAUSE_COUNT = "blur.max.clause.count";
+  public static final String BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS = "blur.shard.cache.max.querycache.elements";
+  public static final String BLUR_SHARD_OPENER_THREAD_COUNT = "blur.shard.opener.thread.count";
+  public static final String BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE = "blur.shard.index.deletion.policy.maxage";
+  public static final String BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE = "blur.zookeeper.system.time.tolerance";
+  public static final String BLUR_SAHRD_INDEX_SIMILARITY = "blur.sahrd.index.similarity";
+
+  public static final String BLUR_SHARD_TIME_BETWEEN_COMMITS = "blur.shard.time.between.commits";
+  public static final String BLUR_SHARD_TIME_BETWEEN_REFRESHS = "blur.shard.time.between.refreshs";
+
+  public static final String BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT = "blur.controller.server.thrift.thread.count";
+  public static final String BLUR_CONTROLLER_SERVER_REMOTE_THREAD_COUNT = "blur.controller.server.remote.thread.count";
+  public static final String BLUR_CONTROLLER_CACHE_MAX_TIMETOLIVE = "blur.controller.cache.max.timetolive";
+  public static final String BLUR_CONTROLLER_CACHE_MAX_QUERYCACHE_ELEMENTS = "blur.controller.cache.max.querycache.elements";
+  public static final String BLUR_CONTROLLER_REMOTE_FETCH_COUNT = "blur.controller.remote.fetch.count";
+
+  public static final String BLUR_CONTROLLER_RETRY_MAX_MUTATE_RETRIES = "blur.controller.retry.max.mutate.retries";
+  public static final String BLUR_CONTROLLER_RETRY_MAX_DEFAULT_RETRIES = "blur.controller.retry.max.default.retries";
+  public static final String BLUR_CONTROLLER_RETRY_FETCH_DELAY = "blur.controller.retry.fetch.delay";
+  public static final String BLUR_CONTROLLER_RETRY_DEFAULT_DELAY = "blur.controller.retry.default.delay";
+  public static final String BLUR_CONTROLLER_RETRY_MUTATE_DELAY = "blur.controller.retry.mutate.delay";
+  public static final String BLUR_CONTROLLER_RETRY_MAX_FETCH_DELAY = "blur.controller.retry.max.fetch.delay";
+  public static final String BLUR_CONTROLLER_RETRY_MAX_MUTATE_DELAY = "blur.controller.retry.max.mutate.delay";
+  public static final String BLUR_CONTROLLER_RETRY_MAX_DEFAULT_DELAY = "blur.controller.retry.max.default.delay";
+  public static final String BLUR_CONTROLLER_RETRY_MAX_FETCH_RETRIES = "blur.controller.retry.max.fetch.retries";
+
+  public static final String BLUR_GUI_CONTROLLER_PORT = "blur.gui.controller.port";
+  public static final String BLUR_GUI_SHARD_PORT = "blur.gui.shard.port";
+
+  public static final String DEFAULT = "default";
+  public static final String BLUR_CLUSTER_NAME = "blur.cluster.name";
+  public static final String BLUR_CLUSTER;
+
+  public static final long ZK_WAIT_TIME = TimeUnit.SECONDS.toMillis(5);
+
+  public static final Term PRIME_DOC_TERM = new Term(PRIME_DOC, BlurConstants.PRIME_DOC_VALUE);
+
+  static {
+    try {
+      BlurConfiguration configuration = new BlurConfiguration();
+      BLUR_CLUSTER = configuration.get(BLUR_CLUSTER_NAME, DEFAULT);
+    } catch (IOException e) {
+      throw new RuntimeException("Unknown error parsing configuration.", e);
+    }
+  }
+
+  public static int getPid() {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/utils/BlurExecutorCompletionService.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/BlurExecutorCompletionService.java b/blur-core/src/main/java/org/apache/blur/utils/BlurExecutorCompletionService.java
new file mode 100644
index 0000000..44327da
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/utils/BlurExecutorCompletionService.java
@@ -0,0 +1,135 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.generated.BlurException;
+
+
+public class BlurExecutorCompletionService<T> extends ExecutorCompletionService<T> {
+
+  private AtomicInteger count = new AtomicInteger(0);
+  private Collection<Future<T>> _bag;
+  private Cancel _cancel;
+
+  public interface Cancel {
+    void cancel();
+  }
+
+  public BlurExecutorCompletionService(Executor executor, Cancel cancel) {
+    super(executor);
+    _bag = Collections.synchronizedCollection(new HashSet<Future<T>>());
+    _cancel = cancel;
+  }
+
+  public void cancelAll() {
+    for (Future<T> future : _bag) {
+      future.cancel(true);
+    }
+    _cancel.cancel();
+  }
+
+  private Future<T> remember(Future<T> future) {
+    _bag.add(future);
+    return future;
+  }
+
+  private Future<T> forget(Future<T> future) {
+    _bag.remove(future);
+    return future;
+  }
+
+  public int getRemainingCount() {
+    return count.get();
+  }
+
+  @Override
+  public Future<T> poll() {
+    Future<T> poll = super.poll();
+    if (poll != null) {
+      count.decrementAndGet();
+    }
+    return forget(poll);
+  }
+
+  @Override
+  public Future<T> poll(long timeout, TimeUnit unit) throws InterruptedException {
+    Future<T> poll = super.poll(timeout, unit);
+    if (poll != null) {
+      count.decrementAndGet();
+    }
+    return forget(poll);
+  }
+
+  @Override
+  public Future<T> submit(Callable<T> task) {
+    Future<T> future = super.submit(task);
+    count.incrementAndGet();
+    return remember(future);
+  }
+
+  @Override
+  public Future<T> submit(Runnable task, T result) {
+    Future<T> future = super.submit(task, result);
+    count.incrementAndGet();
+    return remember(future);
+  }
+
+  @Override
+  public Future<T> take() throws InterruptedException {
+    Future<T> take = super.take();
+    if (take != null) {
+      count.decrementAndGet();
+    }
+    return forget(take);
+  }
+
+  public Future<T> poll(long timeout, TimeUnit unit, boolean throwExceptionIfTimeout, Object... parameters) throws BlurException {
+    try {
+      Future<T> future = poll(timeout, unit);
+      if (future == null) {
+        throw new BException("Call timeout [{0}]", Arrays.asList(parameters));
+      }
+      return future;
+    } catch (InterruptedException e) {
+      throw new BException("Call interrupted [{0}]", e, Arrays.asList(parameters));
+    }
+  }
+
+  public T getResultThrowException(Future<T> future, Object... parameters) throws BlurException {
+    try {
+      return future.get();
+    } catch (InterruptedException e) {
+      throw new BException("Call interrupted [{0}]", e, Arrays.asList(parameters));
+    } catch (ExecutionException e) {
+      throw new BException("Call execution exception [{0}]", e.getCause(), Arrays.asList(parameters));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java b/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java
new file mode 100644
index 0000000..6323af5
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java
@@ -0,0 +1,46 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+
+public class BlurThriftRecord extends Record implements ReaderBlurRecord {
+
+  private static final long serialVersionUID = 1447192115360284850L;
+
+  @Override
+  public void addColumn(String name, String value) {
+    addToColumns(new Column(name, value));
+  }
+
+  @Override
+  public void setRecordIdStr(String value) {
+    setRecordId(value);
+  }
+
+  @Override
+  public void setFamilyStr(String family) {
+    setFamily(family);
+  }
+
+  @Override
+  public void setRowIdStr(String rowId) {
+    // do nothing
+  }
+
+}


Mime
View raw message