incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [12/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SnapshotIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SnapshotIndexReader.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SnapshotIndexReader.java
new file mode 100644
index 0000000..171b3b6
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SnapshotIndexReader.java
@@ -0,0 +1,170 @@
+package org.apache.blur.manager.writer.lucene;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.lucene.index.FilterIndexReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TermDocs;
+import org.apache.lucene.index.TermPositions;
+
+public class SnapshotIndexReader extends FilterIndexReader {
+
+  private final int maxDocs;
+  private final int numDocs;
+
+  public SnapshotIndexReader(IndexReader in, int maxDocs) {
+    super(in);
+    this.numDocs = in.numDocs();
+    this.maxDocs = maxDocs;
+  }
+
+  public static IndexReader wrap(IndexReader reader, int maxDocs) throws IOException {
+    IndexReader[] readers = reader.getSequentialSubReaders();
+    if (readers == null) {
+      return wrapInternal(reader, maxDocs);
+    } else {
+      IndexReader[] result = new IndexReader[readers.length];
+      for (int i = 0; i < readers.length; i++) {
+        result[i] = wrapInternal(readers[i], maxDocs);
+      }
+      return new MultiReader(result, false);
+    }
+  }
+
+  private static IndexReader wrapInternal(IndexReader reader, int maxDocs) throws IOException {
+    return new SnapshotIndexReader(reader, maxDocs);
+  }
+
+  @Override
+  public IndexReader[] getSequentialSubReaders() {
+    return null;
+  }
+
+  @Override
+  public int numDocs() {
+    return numDocs;
+  }
+
+  @Override
+  public int maxDoc() {
+    return maxDocs;
+  }
+
+  @Override
+  public TermDocs termDocs() throws IOException {
+    return new SnapshotTermDocs(in.termDocs(), maxDocs);
+  }
+
+  public TermDocs termDocs(Term term) throws IOException {
+    ensureOpen();
+    TermDocs termDocs = termDocs();
+    termDocs.seek(term);
+    return termDocs;
+  }
+
+  @Override
+  public TermPositions termPositions() throws IOException {
+    return new SnapshotTermPositions(in.termPositions(), maxDocs);
+  }
+
+  // public TermPositions termPositions(Term term) throws IOException {
+  // ensureOpen();
+  // TermPositions termPositions = termPositions();
+  // termPositions.seek(term);
+  // return termPositions;
+  // }
+
+  public static class SnapshotTermPositions extends FilterTermPositions {
+
+    private final int maxDocs;
+
+    public SnapshotTermPositions(TermPositions termPositions, int maxDocs) {
+      super(termPositions);
+      this.maxDocs = maxDocs;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      boolean next = super.next();
+      if (next) {
+        if (doc() >= maxDocs) {
+          return false;
+        }
+      }
+      return next;
+    }
+
+    @Override
+    public int read(int[] docs, int[] freqs) throws IOException {
+      int read = super.read(docs, freqs);
+      if (read == 0) {
+        return 0;
+      }
+      if (doc() >= maxDocs) {
+        return checkResults(docs, maxDocs);
+      }
+      return read;
+    }
+  }
+
+  public static class SnapshotTermDocs extends FilterTermDocs {
+
+    private final int maxDocs;
+
+    public SnapshotTermDocs(TermDocs termDocs, int maxDocs) {
+      super(termDocs);
+      this.maxDocs = maxDocs;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      boolean next = super.next();
+      if (next) {
+        if (doc() >= maxDocs) {
+          return false;
+        }
+      }
+      return next;
+    }
+
+    @Override
+    public int read(int[] docs, int[] freqs) throws IOException {
+      int read = super.read(docs, freqs);
+      if (read == 0) {
+        return 0;
+      }
+      if (doc() >= maxDocs) {
+        return checkResults(docs, maxDocs);
+      }
+      return read;
+    }
+  }
+
+  private static int checkResults(int[] docs, int maxDocs) {
+    int length = docs.length;
+    for (int i = 0; i < length; i++) {
+      if (docs[i] >= maxDocs) {
+        return i;
+      }
+    }
+    return length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SoftDeleteIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SoftDeleteIndexReader.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SoftDeleteIndexReader.java
new file mode 100644
index 0000000..f81e619
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SoftDeleteIndexReader.java
@@ -0,0 +1,213 @@
+package org.apache.blur.manager.writer.lucene;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.lucene.index.FilterIndexReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TermDocs;
+import org.apache.lucene.index.TermPositions;
+import org.apache.lucene.util.BitVector;
+
+public class SoftDeleteIndexReader extends FilterIndexReader {
+
+  private final boolean baseHasDeletions;
+  private final BitVector deletes;
+  private final int deleteCount;
+  private final int numDocs;
+
+  public SoftDeleteIndexReader(IndexReader in, BitVector deletes) {
+    super(in);
+    this.baseHasDeletions = in.hasDeletions();
+    if (deletes == null) {
+      throw new RuntimeException("No deletes, use regular indexreader.");
+    }
+    this.deletes = deletes;
+    this.deleteCount = deletes.count();
+    this.numDocs = in.numDocs() - deleteCount;
+  }
+
+  public static IndexReader wrap(IndexReader reader, Collection<Term> deleteTerms) throws IOException {
+    IndexReader[] readers = reader.getSequentialSubReaders();
+    if (readers == null) {
+      return wrapInternal(reader, deleteTerms);
+    } else {
+      IndexReader[] result = new IndexReader[readers.length];
+      for (int i = 0; i < readers.length; i++) {
+        result[i] = wrapInternal(readers[i], deleteTerms);
+      }
+      return new MultiReader(result, false);
+    }
+  }
+
+  private static IndexReader wrapInternal(IndexReader reader, Collection<Term> deleteTerms) throws IOException {
+    BitVector deletes = getDeletes(reader, deleteTerms);
+    if (deletes == null) {
+      return reader;
+    }
+    return new SoftDeleteIndexReader(reader, deletes);
+  }
+
+  private static BitVector getDeletes(IndexReader reader, Collection<Term> deleteTerms) throws IOException {
+    BitVector deletes = null;
+    TermDocs termDocs = reader.termDocs();
+    for (Term t : deleteTerms) {
+      termDocs.seek(t);
+      while (termDocs.next()) {
+        if (deletes == null) {
+          deletes = new BitVector(reader.maxDoc());
+        }
+        int doc = termDocs.doc();
+        deletes.set(doc);
+      }
+    }
+    termDocs.close();
+    return deletes;
+  }
+
+  @Override
+  public IndexReader[] getSequentialSubReaders() {
+    return null;
+  }
+
+  @Override
+  public int numDocs() {
+    return numDocs;
+  }
+
+  @Override
+  public boolean isDeleted(int n) {
+    if (baseHasDeletions && in.isDeleted(n)) {
+      return true;
+    }
+    return deletes.get(n);
+  }
+
+  @Override
+  public boolean hasDeletions() {
+    return baseHasDeletions;
+  }
+
+  @Override
+  public TermDocs termDocs() throws IOException {
+    return new SoftDeleteTermDocs(in.termDocs(), deletes);
+  }
+
+  public TermDocs termDocs(Term term) throws IOException {
+    ensureOpen();
+    TermDocs termDocs = termDocs();
+    termDocs.seek(term);
+    return termDocs;
+  }
+
+  @Override
+  public TermPositions termPositions() throws IOException {
+    return new SoftDeleteTermPositions(in.termPositions(), deletes);
+  }
+
+  // public TermPositions termPositions(Term term) throws IOException {
+  // ensureOpen();
+  // TermPositions termPositions = termPositions();
+  // termPositions.seek(term);
+  // return termPositions;
+  // }
+
+  public static class SoftDeleteTermPositions extends FilterTermPositions {
+
+    private BitVector deletes;
+
+    public SoftDeleteTermPositions(TermPositions termPositions, BitVector deletes) {
+      super(termPositions);
+      this.deletes = deletes;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      while (super.next()) {
+        if (!deletes.get(doc())) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public int read(int[] docs, int[] freqs) throws IOException {
+      int read = super.read(docs, freqs);
+      if (read == 0) {
+        return 0;
+      }
+      int validResults = removeDeletes(docs, freqs, read, deletes);
+      if (validResults == 0) {
+        return read(docs, freqs);
+      }
+      return validResults;
+    }
+  }
+
+  public static class SoftDeleteTermDocs extends FilterTermDocs {
+
+    private BitVector deletes;
+
+    public SoftDeleteTermDocs(TermDocs termDocs, BitVector deletes) {
+      super(termDocs);
+      this.deletes = deletes;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      while (super.next()) {
+        if (!deletes.get(doc())) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public int read(int[] docs, int[] freqs) throws IOException {
+      int read = super.read(docs, freqs);
+      if (read == 0) {
+        return 0;
+      }
+      int validResults = removeDeletes(docs, freqs, read, deletes);
+      if (validResults == 0) {
+        return read(docs, freqs);
+      }
+      return validResults;
+    }
+  }
+
+  private static int removeDeletes(int[] docs, int[] freqs, int validLength, BitVector deletes) {
+    int targetPosition = 0;
+    for (int i = 0; i < validLength; i++) {
+      int doc = docs[i];
+      if (!deletes.get(doc)) {
+        if (targetPosition != i) {
+          docs[targetPosition] = doc;
+          freqs[targetPosition] = freqs[i];
+        }
+        targetPosition++;
+      }
+    }
+    return targetPosition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
new file mode 100644
index 0000000..2ce0fe5
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -0,0 +1,811 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.manager.BlurQueryChecker;
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
+import org.apache.blur.manager.indexserver.DistributedLayoutManager;
+import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.manager.results.BlurResultIterableClient;
+import org.apache.blur.manager.results.MergerBlurResultIterable;
+import org.apache.blur.manager.stats.MergerTableStats;
+import org.apache.blur.manager.status.MergerQueryStatus;
+import org.apache.blur.manager.status.MergerQueryStatusSingle;
+import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.BlurClientManager;
+import org.apache.blur.thrift.commands.BlurCommand;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.thrift.generated.BlurResults;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.Schema;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurExecutorCompletionService;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.ForkJoin;
+import org.apache.blur.utils.QueryCache;
+import org.apache.blur.utils.QueryCacheEntry;
+import org.apache.blur.utils.QueryCacheKey;
+import org.apache.blur.utils.ForkJoin.Merger;
+import org.apache.blur.utils.ForkJoin.ParallelCall;
+import org.apache.blur.zookeeper.WatchChildren;
+import org.apache.blur.zookeeper.WatchNodeExistance;
+import org.apache.blur.zookeeper.WatchChildren.OnChange;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+
+public class BlurControllerServer extends TableAdmin implements Iface {
+
+  public static abstract class BlurClient {
+    public abstract <T> T execute(String node, BlurCommand<T> command, int maxRetries, long backOffTime, long maxBackOffTime) throws Exception;
+  }
+
+  public static class BlurClientRemote extends BlurClient {
+    @Override
+    public <T> T execute(String node, BlurCommand<T> command, int maxRetries, long backOffTime, long maxBackOffTime) throws Exception {
+      return BlurClientManager.execute(node, command, maxRetries, backOffTime, maxBackOffTime);
+    }
+  }
+
+  private static final String CONTROLLER_THREAD_POOL = "controller-thread-pool";
+  private static final Log LOG = LogFactory.getLog(BlurControllerServer.class);
+
+  private ExecutorService _executor;
+  private AtomicReference<Map<String, Map<String, String>>> _shardServerLayout = new AtomicReference<Map<String, Map<String, String>>>(new HashMap<String, Map<String, String>>());
+  private BlurClient _client;
+  private int _threadCount = 64;
+  private AtomicBoolean _closed = new AtomicBoolean();
+  private Map<String, Integer> _tableShardCountMap = new ConcurrentHashMap<String, Integer>();
+  private BlurPartitioner<BytesWritable, Void> _blurPartitioner = new BlurPartitioner<BytesWritable, Void>();
+  private String _nodeName;
+  private int _remoteFetchCount = 100;
+  private long _maxTimeToLive = TimeUnit.MINUTES.toMillis(1);
+  private int _maxQueryCacheElements = 128;
+  private QueryCache _queryCache;
+  private BlurQueryChecker _queryChecker;
+  private AtomicBoolean _running = new AtomicBoolean();
+
+  private int _maxFetchRetries = 3;
+  private int _maxMutateRetries = 3;
+  private int _maxDefaultRetries = 3;
+  private long _fetchDelay = 500;
+  private long _mutateDelay = 500;
+  private long _defaultDelay = 500;
+  private long _maxFetchDelay = 2000;
+  private long _maxMutateDelay = 2000;
+  private long _maxDefaultDelay = 2000;
+
+  private long _defaultParallelCallTimeout = TimeUnit.MINUTES.toMillis(1);
+  private WatchChildren _watchForClusters;
+  private ConcurrentMap<String, WatchNodeExistance> _watchForTablesPerClusterExistance = new ConcurrentHashMap<String, WatchNodeExistance>();
+  private ConcurrentMap<String, WatchNodeExistance> _watchForOnlineShardsPerClusterExistance = new ConcurrentHashMap<String, WatchNodeExistance>();
+  private ConcurrentMap<String, WatchChildren> _watchForTablesPerCluster = new ConcurrentHashMap<String, WatchChildren>();
+  private ConcurrentMap<String, WatchChildren> _watchForOnlineShardsPerCluster = new ConcurrentHashMap<String, WatchChildren>();
+
+  public void init() throws KeeperException, InterruptedException {
+    setupZookeeper();
+    registerMyself();
+    _queryCache = new QueryCache("controller-cache", _maxQueryCacheElements, _maxTimeToLive);
+    _executor = Executors.newThreadPool(CONTROLLER_THREAD_POOL, _threadCount);
+    _running.set(true);
+    watchForClusterChanges();
+    List<String> clusterList = _clusterStatus.getClusterList(false);
+    for (String cluster : clusterList) {
+      watchForLayoutChanges(cluster);
+    }
+    updateLayout();
+  }
+
+  private void setupZookeeper() throws KeeperException, InterruptedException {
+    BlurUtil.createIfMissing(_zookeeper, "/blur");
+    BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getOnlineControllersPath());
+    BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getClustersPath());
+  }
+
+  private void watchForClusterChanges() throws KeeperException, InterruptedException {
+    _watchForClusters = new WatchChildren(_zookeeper, ZookeeperPathConstants.getClustersPath());
+    _watchForClusters.watch(new OnChange() {
+      @Override
+      public void action(List<String> children) {
+        for (String cluster : children) {
+          try {
+            watchForLayoutChanges(cluster);
+          } catch (KeeperException e) {
+            LOG.error("Unknown error", e);
+            throw new RuntimeException(e);
+          } catch (InterruptedException e) {
+            LOG.error("Unknown error", e);
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    });
+  }
+
+  private void watchForLayoutChanges(final String cluster) throws KeeperException, InterruptedException {
+    WatchNodeExistance we1 = new WatchNodeExistance(_zookeeper, ZookeeperPathConstants.getTablesPath(cluster));
+    we1.watch(new WatchNodeExistance.OnChange() {
+      @Override
+      public void action(Stat stat) {
+        if (stat != null) {
+          watch(cluster, ZookeeperPathConstants.getTablesPath(cluster), _watchForTablesPerCluster);
+        }
+      }
+    });
+    if (_watchForTablesPerClusterExistance.putIfAbsent(cluster, we1) != null) {
+      we1.close();
+    }
+
+    WatchNodeExistance we2 = new WatchNodeExistance(_zookeeper, ZookeeperPathConstants.getTablesPath(cluster));
+    we2.watch(new WatchNodeExistance.OnChange() {
+      @Override
+      public void action(Stat stat) {
+        if (stat != null) {
+          watch(cluster, ZookeeperPathConstants.getOnlineShardsPath(cluster), _watchForOnlineShardsPerCluster);
+        }
+      }
+    });
+    if (_watchForOnlineShardsPerClusterExistance.putIfAbsent(cluster, we2) != null) {
+      we2.close();
+    }
+  }
+
+  private void watch(String cluster, String path, ConcurrentMap<String, WatchChildren> map) {
+    WatchChildren watchForTables = new WatchChildren(_zookeeper, path);
+    watchForTables.watch(new OnChange() {
+      @Override
+      public void action(List<String> children) {
+        LOG.info("Layout change.");
+        updateLayout();
+      }
+    });
+
+    if (map.putIfAbsent(cluster, watchForTables) != null) {
+      watchForTables.close();
+    }
+  }
+
+  private synchronized void updateLayout() {
+    if (!_clusterStatus.isOpen()) {
+      LOG.warn("The cluster status object has been closed.");
+      return;
+    }
+    List<String> tableList = _clusterStatus.getTableList(false);
+    HashMap<String, Map<String, String>> newLayout = new HashMap<String, Map<String, String>>();
+    for (String table : tableList) {
+      DistributedLayoutManager layoutManager = new DistributedLayoutManager();
+      String cluster = _clusterStatus.getCluster(false, table);
+      if (cluster == null) {
+        continue;
+      }
+      List<String> shardServerList = _clusterStatus.getShardServerList(cluster);
+      List<String> offlineShardServers = _clusterStatus.getOfflineShardServers(false, cluster);
+      List<String> shardList = getShardList(cluster, table);
+      layoutManager.setNodes(shardServerList);
+      layoutManager.setNodesOffline(offlineShardServers);
+      layoutManager.setShards(shardList);
+      layoutManager.init();
+      Map<String, String> layout = layoutManager.getLayout();
+      newLayout.put(table, layout);
+    }
+    _shardServerLayout.set(newLayout);
+  }
+
+  private List<String> getShardList(String cluster, String table) {
+    List<String> shards = new ArrayList<String>();
+    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
+    for (int i = 0; i < tableDescriptor.shardCount; i++) {
+      shards.add(BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i));
+    }
+    return shards;
+  }
+
+  private void registerMyself() {
+    try {
+      String onlineControllerPath = ZookeeperPathConstants.getOnlineControllersPath() + "/" + _nodeName;
+      while (_zookeeper.exists(onlineControllerPath, false) != null) {
+        LOG.info("Node [{0}] already registered, waiting for path [{1}] to be released", _nodeName, onlineControllerPath);
+        Thread.sleep(3000);
+      }
+      String version = BlurUtil.getVersion();
+      _zookeeper.create(onlineControllerPath, version.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public synchronized void close() {
+    if (!_closed.get()) {
+      _closed.set(true);
+      _running.set(false);
+      _executor.shutdownNow();
+      close(_watchForClusters);
+      close(_watchForOnlineShardsPerCluster.values());
+      close(_watchForOnlineShardsPerClusterExistance.values());
+      close(_watchForTablesPerCluster.values());
+      close(_watchForTablesPerClusterExistance.values());
+    }
+  }
+
+  private void close(Collection<? extends Closeable> closableLst) {
+    for (Closeable closeable : closableLst) {
+      close(closeable);
+    }
+  }
+
+  private void close(Closeable closeable) {
+    try {
+      closeable.close();
+    } catch (IOException e) {
+      LOG.error("Unknown", e);
+    }
+  }
+
+  @Override
+  public BlurResults query(final String table, final BlurQuery blurQuery) throws BlurException, TException {
+    // @TODO make this faster
+    checkTable(table);
+    String cluster = _clusterStatus.getCluster(true, table);
+    _queryChecker.checkQuery(blurQuery);
+    int shardCount = _clusterStatus.getShardCount(true, cluster, table);
+
+    OUTER: for (int retries = 0; retries < _maxDefaultRetries; retries++) {
+      try {
+        final AtomicLongArray facetCounts = BlurUtil.getAtomicLongArraySameLengthAsList(blurQuery.facets);
+
+        BlurQuery original = new BlurQuery(blurQuery);
+        if (blurQuery.useCacheIfPresent) {
+          LOG.debug("Using cache for query [{0}] on table [{1}].", blurQuery, table);
+          QueryCacheKey key = QueryCache.getNormalizedBlurQueryKey(table, blurQuery);
+          QueryCacheEntry queryCacheEntry = _queryCache.get(key);
+          if (_queryCache.isValid(queryCacheEntry)) {
+            LOG.debug("Cache hit for query [{0}] on table [{1}].", blurQuery, table);
+            return queryCacheEntry.getBlurResults(blurQuery);
+          } else {
+            _queryCache.remove(key);
+          }
+        }
+
+        BlurUtil.setStartTime(original);
+
+        Selector selector = blurQuery.getSelector();
+        blurQuery.setSelector(null);
+
+        BlurResultIterable hitsIterable = scatterGather(getCluster(table), new BlurCommand<BlurResultIterable>() {
+          @Override
+          public BlurResultIterable call(Client client) throws BlurException, TException {
+            return new BlurResultIterableClient(client, table, blurQuery, facetCounts, _remoteFetchCount);
+          }
+        }, new MergerBlurResultIterable(blurQuery));
+        BlurResults results = BlurUtil.convertToHits(hitsIterable, blurQuery, facetCounts, _executor, selector, this, table);
+        if (!validResults(results, shardCount, blurQuery)) {
+          BlurClientManager.sleep(_defaultDelay, _maxDefaultDelay, retries, _maxDefaultRetries);
+          continue OUTER;
+        }
+        return _queryCache.cache(table, original, results);
+      } catch (Exception e) {
+        LOG.error("Unknown error during search of [table={0},blurQuery={1}]", e, table, blurQuery);
+        throw new BException("Unknown error during search of [table={0},blurQuery={1}]", e, table, blurQuery);
+      }
+    }
+    throw new BlurException("Query could not be completed.", null);
+  }
+
+  private boolean validResults(BlurResults results, int shardCount, BlurQuery query) {
+    if (results.totalResults >= query.minimumNumberOfResults) {
+      return true;
+    }
+    int shardInfoSize = results.getShardInfoSize();
+    if (shardInfoSize == shardCount) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public FetchResult fetchRow(final String table, final Selector selector) throws BlurException, TException {
+    checkTable(table);
+    IndexManager.validSelector(selector);
+    String clientHostnamePort = null;
+    try {
+      clientHostnamePort = getNode(table, selector);
+      return _client.execute(clientHostnamePort, new BlurCommand<FetchResult>() {
+        @Override
+        public FetchResult call(Client client) throws BlurException, TException {
+          return client.fetchRow(table, selector);
+        }
+      }, _maxFetchRetries, _fetchDelay, _maxFetchDelay);
+    } catch (Exception e) {
+      LOG.error("Unknown error during fetch of row from table [{0}] selector [{1}] node [{2}]", e, table, selector, clientHostnamePort);
+      throw new BException("Unknown error during fetch of row from table [{0}] selector [{1}] node [{2}]", e, table, selector, clientHostnamePort);
+    }
+  }
+
+  @Override
+  public void cancelQuery(final String table, final long uuid) throws BlurException, TException {
+    checkTable(table);
+    try {
+      scatter(getCluster(table), new BlurCommand<Void>() {
+        @Override
+        public Void call(Client client) throws BlurException, TException {
+          client.cancelQuery(table, uuid);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to cancel search table [{0}] uuid [{1}]", e, table, uuid);
+      throw new BException("Unknown error while trying to cancel search table [{0}] uuid [{1}]", e, table, uuid);
+    }
+  }
+
+  @Override
+  public List<BlurQueryStatus> currentQueries(final String table) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<List<BlurQueryStatus>>() {
+        @Override
+        public List<BlurQueryStatus> call(Client client) throws BlurException, TException {
+          return client.currentQueries(table);
+        }
+      }, new MergerQueryStatus(_defaultParallelCallTimeout));
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get current searches [{0}]", e, table);
+      throw new BException("Unknown error while trying to get current searches [{0}]", e, table);
+    }
+  }
+
+  @Override
+  public List<Long> queryStatusIdList(final String table) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<List<Long>>() {
+        @Override
+        public List<Long> call(Client client) throws BlurException, TException {
+          return client.queryStatusIdList(table);
+        }
+      }, new Merger<List<Long>>() {
+        @Override
+        public List<Long> merge(BlurExecutorCompletionService<List<Long>> service) throws BlurException {
+          Set<Long> result = new HashSet<Long>();
+          while (service.getRemainingCount() > 0) {
+            Future<List<Long>> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true);
+            List<Long> ids = service.getResultThrowException(future);
+            result.addAll(ids);
+          }
+          return new ArrayList<Long>(result);
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get query status ids for table [{0}]", e, table);
+      throw new BException("Unknown error while trying to get query status ids for table [{0}]", e, table);
+    }
+  }
+
+  @Override
+  public BlurQueryStatus queryStatusById(final String table, final long uuid) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<BlurQueryStatus>() {
+        @Override
+        public BlurQueryStatus call(Client client) throws BlurException, TException {
+          return client.queryStatusById(table, uuid);
+        }
+      }, new MergerQueryStatusSingle(_defaultParallelCallTimeout));
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get query status [{0}]", e, table, uuid);
+      throw new BException("Unknown error while trying to get query status [{0}]", e, table, uuid);
+    }
+  }
+
+  @Override
+  public TableStats tableStats(final String table) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<TableStats>() {
+        @Override
+        public TableStats call(Client client) throws BlurException, TException {
+          return client.getTableStats(table);
+        }
+      }, new MergerTableStats(_defaultParallelCallTimeout));
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get table stats [{0}]", e, table);
+      throw new BException("Unknown error while trying to get table stats [{0}]", e, table);
+    }
+  }
+
+  @Override
+  public Map<String, String> shardServerLayout(String table) throws BlurException, TException {
+    checkTable(table);
+    Map<String, Map<String, String>> layout = _shardServerLayout.get();
+    Map<String, String> tableLayout = layout.get(table);
+    if (tableLayout == null) {
+      return new HashMap<String, String>();
+    }
+    return tableLayout;
+  }
+
+  @Override
+  public long recordFrequency(final String table, final String columnFamily, final String columnName, final String value) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<Long>() {
+        @Override
+        public Long call(Client client) throws BlurException, TException {
+          return client.recordFrequency(table, columnFamily, columnName, value);
+        }
+      }, new Merger<Long>() {
+
+        @Override
+        public Long merge(BlurExecutorCompletionService<Long> service) throws BlurException {
+          Long total = 0L;
+          while (service.getRemainingCount() > 0) {
+            Future<Long> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table, columnFamily, columnName, value);
+            total += service.getResultThrowException(future, table, columnFamily, columnName, value);
+          }
+          return total;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get record frequency [{0}/{1}/{2}/{3}]", e, table, columnFamily, columnName, value);
+      throw new BException("Unknown error while trying to get record frequency [{0}/{1}/{2}/{3}]", e, table, columnFamily, columnName, value);
+    }
+  }
+
+  @Override
+  public Schema schema(final String table) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<Schema>() {
+        @Override
+        public Schema call(Client client) throws BlurException, TException {
+          return client.schema(table);
+        }
+      }, new Merger<Schema>() {
+        @Override
+        public Schema merge(BlurExecutorCompletionService<Schema> service) throws BlurException {
+          Schema result = null;
+          while (service.getRemainingCount() > 0) {
+            Future<Schema> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table);
+            Schema schema = service.getResultThrowException(future, table);
+            if (result == null) {
+              result = schema;
+            } else {
+              result = BlurControllerServer.merge(result, schema);
+            }
+          }
+          return result;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to schema table [{0}]", e, table);
+      throw new BException("Unknown error while trying to schema table [{0}]", e, table);
+    }
+  }
+
+  @Override
+  public List<String> terms(final String table, final String columnFamily, final String columnName, final String startWith, final short size) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<List<String>>() {
+        @Override
+        public List<String> call(Client client) throws BlurException, TException {
+          return client.terms(table, columnFamily, columnName, startWith, size);
+        }
+      }, new Merger<List<String>>() {
+        @Override
+        public List<String> merge(BlurExecutorCompletionService<List<String>> service) throws BlurException {
+          TreeSet<String> terms = new TreeSet<String>();
+          while (service.getRemainingCount() > 0) {
+            Future<List<String>> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table, columnFamily, columnName, startWith, size);
+            terms.addAll(service.getResultThrowException(future, table, columnFamily, columnName, startWith, size));
+          }
+          return new ArrayList<String>(terms).subList(0, Math.min(terms.size(), size));
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to terms table [{0}] columnFamily [{1}] columnName [{2}] startWith [{3}] size [{4}]", e, table, columnFamily, columnName, startWith,
+          size);
+      throw new BException("Unknown error while trying to terms table [{0}] columnFamily [{1}] columnName [{2}] startWith [{3}] size [{4}]", e, table, columnFamily, columnName,
+          startWith, size);
+    }
+  }
+
+  private String getNode(String table, Selector selector) throws BlurException, TException {
+    Map<String, String> layout = shardServerLayout(table);
+    String locationId = selector.locationId;
+    if (locationId != null) {
+      String shard = locationId.substring(0, locationId.indexOf('/'));
+      return layout.get(shard);
+    }
+    int numberOfShards = getShardCount(table);
+    if (selector.rowId != null) {
+      String shardName = MutationHelper.getShardName(table, selector.rowId, numberOfShards, _blurPartitioner);
+      return layout.get(shardName);
+    }
+    throw new BlurException("Selector is missing both a locationid and a rowid, one is needed.", null);
+  }
+
+  private <R> R scatterGather(String cluster, final BlurCommand<R> command, Merger<R> merger) throws Exception {
+    return ForkJoin.execute(_executor, _clusterStatus.getOnlineShardServers(true, cluster), new ParallelCall<String, R>() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public R call(String hostnamePort) throws Exception {
+        return _client.execute(hostnamePort, (BlurCommand<R>) command.clone(), _maxDefaultRetries, _defaultDelay, _maxDefaultDelay);
+      }
+    }).merge(merger);
+  }
+
+  private <R> void scatter(String cluster, BlurCommand<R> command) throws Exception {
+    scatterGather(cluster, command, new Merger<R>() {
+      @Override
+      public R merge(BlurExecutorCompletionService<R> service) throws BlurException {
+        while (service.getRemainingCount() > 0) {
+          Future<R> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true);
+          service.getResultThrowException(future);
+        }
+        return null;
+      }
+    });
+  }
+
+  private String getCluster(String table) throws BlurException, TException {
+    TableDescriptor describe = describe(table);
+    if (describe == null) {
+      throw new BlurException("Table [" + table + "] not found.", null);
+    }
+    return describe.cluster;
+  }
+
+  public static Schema merge(Schema result, Schema schema) {
+    Map<String, Set<String>> destColumnFamilies = result.columnFamilies;
+    Map<String, Set<String>> srcColumnFamilies = schema.columnFamilies;
+    for (String srcColumnFamily : srcColumnFamilies.keySet()) {
+      Set<String> destColumnNames = destColumnFamilies.get(srcColumnFamily);
+      Set<String> srcColumnNames = srcColumnFamilies.get(srcColumnFamily);
+      if (destColumnNames == null) {
+        destColumnFamilies.put(srcColumnFamily, srcColumnNames);
+      } else {
+        destColumnNames.addAll(srcColumnNames);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void mutate(final RowMutation mutation) throws BlurException, TException {
+    checkTable(mutation.table);
+    checkForUpdates(mutation.table);
+    try {
+      MutationHelper.validateMutation(mutation);
+      String table = mutation.getTable();
+
+      int numberOfShards = getShardCount(table);
+      Map<String, String> tableLayout = _shardServerLayout.get().get(table);
+      if (tableLayout.size() != numberOfShards) {
+        throw new BlurException("Cannot update data while shard is missing", null);
+      }
+
+      String shardName = MutationHelper.getShardName(table, mutation.rowId, numberOfShards, _blurPartitioner);
+      String node = tableLayout.get(shardName);
+      _client.execute(node, new BlurCommand<Void>() {
+        @Override
+        public Void call(Client client) throws BlurException, TException {
+          client.mutate(mutation);
+          return null;
+        }
+      }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
+    } catch (Exception e) {
+      LOG.error("Unknown error during mutation of [{0}]", e, mutation);
+      throw new BException("Unknown error during mutation of [{0}]", e, mutation);
+    }
+  }
+
+  private int getShardCount(String table) throws BlurException, TException {
+    Integer numberOfShards = _tableShardCountMap.get(table);
+    if (numberOfShards == null) {
+      TableDescriptor descriptor = describe(table);
+      numberOfShards = descriptor.shardCount;
+      _tableShardCountMap.put(table, numberOfShards);
+    }
+    return numberOfShards;
+  }
+
+  @Override
+  public void mutateBatch(List<RowMutation> mutations) throws BlurException, TException {
+    for (RowMutation mutation : mutations) {
+      MutationHelper.validateMutation(mutation);
+    }
+    Map<String, List<RowMutation>> batches = new HashMap<String, List<RowMutation>>();
+    for (RowMutation mutation : mutations) {
+      checkTable(mutation.table);
+      checkForUpdates(mutation.table);
+
+      MutationHelper.validateMutation(mutation);
+      String table = mutation.getTable();
+
+      int numberOfShards = getShardCount(table);
+      Map<String, String> tableLayout = _shardServerLayout.get().get(table);
+      if (tableLayout.size() != numberOfShards) {
+        throw new BlurException("Cannot update data while shard is missing", null);
+      }
+
+      String shardName = MutationHelper.getShardName(table, mutation.rowId, numberOfShards, _blurPartitioner);
+      String node = tableLayout.get(shardName);
+      List<RowMutation> list = batches.get(node);
+      if (list == null) {
+        list = new ArrayList<RowMutation>();
+        batches.put(node, list);
+      }
+      list.add(mutation);
+
+    }
+
+    for (Entry<String, List<RowMutation>> entry : batches.entrySet()) {
+      String node = entry.getKey();
+      final List<RowMutation> mutationsLst = entry.getValue();
+      try {
+        _client.execute(node, new BlurCommand<Void>() {
+          @Override
+          public Void call(Client client) throws BlurException, TException {
+            client.mutateBatch(mutationsLst);
+            return null;
+          }
+        }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
+      } catch (Exception e) {
+        LOG.error("Unknown error during mutations of [{0}]", e, mutationsLst);
+        throw new BException("Unknown error during mutations of [{0}]", e, mutationsLst);
+      }
+    }
+  }
+
+  public void setNodeName(String nodeName) {
+    _nodeName = nodeName;
+  }
+
+  public int getRemoteFetchCount() {
+    return _remoteFetchCount;
+  }
+
+  public void setRemoteFetchCount(int remoteFetchCount) {
+    _remoteFetchCount = remoteFetchCount;
+  }
+
+  public long getMaxTimeToLive() {
+    return _maxTimeToLive;
+  }
+
+  public void setMaxTimeToLive(long maxTimeToLive) {
+    _maxTimeToLive = maxTimeToLive;
+  }
+
+  public int getMaxQueryCacheElements() {
+    return _maxQueryCacheElements;
+  }
+
+  public void setMaxQueryCacheElements(int maxQueryCacheElements) {
+    _maxQueryCacheElements = maxQueryCacheElements;
+  }
+
+  public void setQueryChecker(BlurQueryChecker queryChecker) {
+    _queryChecker = queryChecker;
+  }
+
+  public void setThreadCount(int threadCount) {
+    _threadCount = threadCount;
+  }
+
+  public void setMaxFetchRetries(int maxFetchRetries) {
+    _maxFetchRetries = maxFetchRetries;
+  }
+
+  public void setMaxMutateRetries(int maxMutateRetries) {
+    _maxMutateRetries = maxMutateRetries;
+  }
+
+  public void setMaxDefaultRetries(int maxDefaultRetries) {
+    _maxDefaultRetries = maxDefaultRetries;
+  }
+
+  public void setFetchDelay(long fetchDelay) {
+    _fetchDelay = fetchDelay;
+  }
+
+  public void setMutateDelay(long mutateDelay) {
+    _mutateDelay = mutateDelay;
+  }
+
+  public void setDefaultDelay(long defaultDelay) {
+    _defaultDelay = defaultDelay;
+  }
+
+  public void setMaxFetchDelay(long maxFetchDelay) {
+    _maxFetchDelay = maxFetchDelay;
+  }
+
+  public void setMaxMutateDelay(long maxMutateDelay) {
+    _maxMutateDelay = maxMutateDelay;
+  }
+
+  public void setMaxDefaultDelay(long maxDefaultDelay) {
+    _maxDefaultDelay = maxDefaultDelay;
+  }
+
+  public BlurClient getClient() {
+    return _client;
+  }
+
+  public void setClient(BlurClient client) {
+    _client = client;
+  }
+
+  @Override
+  public void optimize(final String table, final int numberOfSegmentsPerShard) throws BlurException, TException {
+    checkTable(table);
+    try {
+      scatter(getCluster(table), new BlurCommand<Void>() {
+        @Override
+        public Void call(Client client) throws BlurException, TException {
+          client.optimize(table, numberOfSegmentsPerShard);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to optimize [table={0},numberOfSegmentsPerShard={1}]", e, table, numberOfSegmentsPerShard);
+      throw new BException("Unknown error while trying to optimize [table={0},numberOfSegmentsPerShard={1}]", e, table, numberOfSegmentsPerShard);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
new file mode 100644
index 0000000..c403b8c
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -0,0 +1,323 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.BlurQueryChecker;
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.manager.IndexServer;
+import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.thrift.generated.BlurResults;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.Schema;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.QueryCache;
+import org.apache.blur.utils.QueryCacheEntry;
+import org.apache.blur.utils.QueryCacheKey;
+import org.apache.thrift.TException;
+
+
+public class BlurShardServer extends TableAdmin implements Iface {
+
+  private static final Log LOG = LogFactory.getLog(BlurShardServer.class);
+  private IndexManager _indexManager;
+  private IndexServer _indexServer;
+  private boolean _closed;
+  private long _maxTimeToLive = TimeUnit.MINUTES.toMillis(1);
+  private int _maxQueryCacheElements = 128;
+  private QueryCache _queryCache;
+  private BlurQueryChecker _queryChecker;
+  private ExecutorService _dataFetch;
+  private String _cluster = BlurConstants.BLUR_CLUSTER;
+  private int _dataFetchThreadCount = 32;
+
+  public void init() {
+    _queryCache = new QueryCache("shard-cache", _maxQueryCacheElements, _maxTimeToLive);
+    _dataFetch = Executors.newThreadPool("data-fetch-", _dataFetchThreadCount);
+  }
+
+  @Override
+  public BlurResults query(String table, BlurQuery blurQuery) throws BlurException, TException {
+    checkTable(_cluster, table);
+    _queryChecker.checkQuery(blurQuery);
+    try {
+      BlurQuery original = new BlurQuery(blurQuery);
+      if (blurQuery.useCacheIfPresent) {
+        LOG.debug("Using cache for query [{0}] on table [{1}].", blurQuery, table);
+        QueryCacheKey key = QueryCache.getNormalizedBlurQueryKey(table, blurQuery);
+        QueryCacheEntry queryCacheEntry = _queryCache.get(key);
+        if (_queryCache.isValid(queryCacheEntry, _indexServer.getShardListCurrentServerOnly(table))) {
+          LOG.debug("Cache hit for query [{0}] on table [{1}].", blurQuery, table);
+          return queryCacheEntry.getBlurResults(blurQuery);
+        } else {
+          _queryCache.remove(key);
+        }
+      }
+      BlurUtil.setStartTime(original);
+      BlurResultIterable hitsIterable = null;
+      try {
+        AtomicLongArray facetCounts = BlurUtil.getAtomicLongArraySameLengthAsList(blurQuery.facets);
+        hitsIterable = _indexManager.query(table, blurQuery, facetCounts);
+        return _queryCache.cache(table, original, BlurUtil.convertToHits(hitsIterable, blurQuery, facetCounts, _dataFetch, blurQuery.selector, this, table));
+      } catch (Exception e) {
+        LOG.error("Unknown error during search of [table={0},searchQuery={1}]", e, table, blurQuery);
+        throw new BException(e.getMessage(), e);
+      } finally {
+        if (hitsIterable != null) {
+          hitsIterable.close();
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Unknown error during search of [table={0},searchQuery={1}]", e, table, blurQuery);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public FetchResult fetchRow(String table, Selector selector) throws BlurException, TException {
+    checkTable(_cluster, table);
+    try {
+      FetchResult fetchResult = new FetchResult();
+      _indexManager.fetchRow(table, selector, fetchResult);
+      return fetchResult;
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get fetch row [table={0},selector={1}]", e, table, selector);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void cancelQuery(String table, long uuid) throws BlurException, TException {
+    checkTable(_cluster, table);
+    try {
+      _indexManager.cancelQuery(table, uuid);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to cancel search [uuid={0}]", e, uuid);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public List<BlurQueryStatus> currentQueries(String table) throws BlurException, TException {
+    checkTable(_cluster, table);
+    try {
+      return _indexManager.currentQueries(table);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get current search status [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public TableStats tableStats(String table) throws BlurException, TException {
+    checkTable(_cluster, table);
+    try {
+      TableStats tableStats = new TableStats();
+      tableStats.tableName = table;
+      tableStats.recordCount = _indexServer.getRecordCount(table);
+      tableStats.rowCount = _indexServer.getRowCount(table);
+      tableStats.bytes = _indexServer.getTableSize(table);
+      tableStats.queries = 0;
+      return tableStats;
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get table stats [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  public synchronized void close() {
+    if (!_closed) {
+      _closed = true;
+      _indexManager.close();
+      _dataFetch.shutdownNow();
+    }
+  }
+
+  @Override
+  public Map<String, String> shardServerLayout(String table) throws BlurException, TException {
+    checkTable(_cluster, table);
+    try {
+      Map<String, BlurIndex> blurIndexes = _indexServer.getIndexes(table);
+      Map<String, String> result = new TreeMap<String, String>();
+      String nodeName = _indexServer.getNodeName();
+      for (String shard : blurIndexes.keySet()) {
+        result.put(shard, nodeName);
+      }
+      return result;
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to getting shardServerLayout for table [" + table + "]", e);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public long recordFrequency(String table, String columnFamily, String columnName, String value) throws BlurException, TException {
+    checkTable(_cluster, table);
+    try {
+      return _indexManager.recordFrequency(table, columnFamily, columnName, value);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get record frequency for [table={0},columnFamily={1},columnName={2},value={3}]", e, table, columnFamily, columnName, value);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public Schema schema(String table) throws BlurException, TException {
+    checkTable(_cluster, table);
+    try {
+      return _indexManager.schema(table);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get schema for table [{0}={1}]", e, "table", table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public List<String> terms(String table, String columnFamily, String columnName, String startWith, short size) throws BlurException, TException {
+    checkTable(_cluster, table);
+    try {
+      return _indexManager.terms(table, columnFamily, columnName, startWith, size);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get terms list for [table={0},columnFamily={1},columnName={2},startWith={3},size={4}]", e, table, columnFamily, columnName,
+          startWith, size);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void mutate(RowMutation mutation) throws BlurException, TException {
+    checkTable(_cluster, mutation.table);
+    checkForUpdates(_cluster, mutation.table);
+    MutationHelper.validateMutation(mutation);
+    try {
+      _indexManager.mutate(mutation);
+    } catch (Exception e) {
+      LOG.error("Unknown error during processing of [mutation={0}]", e, mutation);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void mutateBatch(List<RowMutation> mutations) throws BlurException, TException {
+    long s = System.nanoTime();
+    for (RowMutation mutation : mutations) {
+      checkTable(_cluster, mutation.table);
+      checkForUpdates(_cluster, mutation.table);
+      MutationHelper.validateMutation(mutation);
+    }
+    try {
+      _indexManager.mutate(mutations);
+    } catch (Exception e) {
+      LOG.error("Unknown error during processing of [mutations={0}]", e, mutations);
+      throw new BException(e.getMessage(), e);
+    }
+    long e = System.nanoTime();
+    LOG.debug("mutateBatch took [" + (e - s) / 1000000.0 + " ms] to complete");
+  }
+
+  public long getMaxTimeToLive() {
+    return _maxTimeToLive;
+  }
+
+  public void setMaxTimeToLive(long maxTimeToLive) {
+    _maxTimeToLive = maxTimeToLive;
+  }
+
+  public int getMaxQueryCacheElements() {
+    return _maxQueryCacheElements;
+  }
+
+  public void setMaxQueryCacheElements(int maxQueryCacheElements) {
+    _maxQueryCacheElements = maxQueryCacheElements;
+  }
+
+  public void setQueryChecker(BlurQueryChecker queryChecker) {
+    _queryChecker = queryChecker;
+  }
+
+  public void setIndexManager(IndexManager indexManager) {
+    _indexManager = indexManager;
+  }
+
+  public void setIndexServer(IndexServer indexServer) {
+    _indexServer = indexServer;
+  }
+
+  @Override
+  public BlurQueryStatus queryStatusById(String table, long uuid) throws BlurException, TException {
+    checkTable(_cluster, table);
+    try {
+      return _indexManager.queryStatus(table, uuid);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get current query status [table={0},uuid={1}]", e, table, uuid);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public List<Long> queryStatusIdList(String table) throws BlurException, TException {
+    checkTable(_cluster, table);
+    try {
+      return _indexManager.queryStatusIdList(table);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get query status id list [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void optimize(String table, int numberOfSegmentsPerShard) throws BlurException, TException {
+    checkTable(_cluster, table);
+    try {
+      _indexManager.optimize(table, numberOfSegmentsPerShard);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to optimize [table={0},numberOfSegmentsPerShard={1}]", e, table, numberOfSegmentsPerShard);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  public int getDataFetchThreadCount() {
+    return _dataFetchThreadCount;
+  }
+
+  public void setDataFetchThreadCount(int dataFetchThreadCount) {
+    _dataFetchThreadCount = dataFetchThreadCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/thrift/ExecutorServicePerMethodCallThriftServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ExecutorServicePerMethodCallThriftServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ExecutorServicePerMethodCallThriftServer.java
new file mode 100644
index 0000000..4c7918d
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ExecutorServicePerMethodCallThriftServer.java
@@ -0,0 +1,163 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExecutorServicePerMethodCallThriftServer extends THsHaServer {
+
+  private static final Object[] ARGS = new Object[] {};
+  private static final Logger LOGGER = LoggerFactory.getLogger(THsHaServer.class.getName());
+  private Method method;
+
+  public static class Args extends THsHaServer.Args {
+    private int workerThreads = 5;
+    private int stopTimeoutVal = 60;
+    private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+    private ExecutorService executorService = null;
+    private Map<String, ExecutorService> methodCallsToExecutorService;
+
+    public Map<String, ExecutorService> getMethodCallsToExecutorService() {
+      return methodCallsToExecutorService;
+    }
+
+    public void setMethodCallsToExecutorService(Map<String, ExecutorService> methodCallsToExecutorService) {
+      this.methodCallsToExecutorService = methodCallsToExecutorService;
+    }
+
+    public Args(TNonblockingServerTransport transport) {
+      super(transport);
+    }
+
+    public Args workerThreads(int i) {
+      workerThreads = i;
+      return this;
+    }
+
+    public int getWorkerThreads() {
+      return workerThreads;
+    }
+
+    public int getStopTimeoutVal() {
+      return stopTimeoutVal;
+    }
+
+    public Args stopTimeoutVal(int stopTimeoutVal) {
+      this.stopTimeoutVal = stopTimeoutVal;
+      return this;
+    }
+
+    public TimeUnit getStopTimeoutUnit() {
+      return stopTimeoutUnit;
+    }
+
+    public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
+      this.stopTimeoutUnit = stopTimeoutUnit;
+      return this;
+    }
+
+    public ExecutorService getExecutorService() {
+      return executorService;
+    }
+
+    public Args executorService(ExecutorService executorService) {
+      this.executorService = executorService;
+      return this;
+    }
+  }
+
+  private ExecutorService invokerStandard;
+  private Map<String, ExecutorService> methodCallsToExecutorService;
+
+  public ExecutorServicePerMethodCallThriftServer(Args args) {
+    super(args);
+    try {
+      method = FrameBuffer.class.getDeclaredMethod("getInputTransport", new Class[] {});
+      method.setAccessible(true);
+    } catch (SecurityException e) {
+      throw new RuntimeException(e);
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException(e);
+    }
+    invokerStandard = args.executorService == null ? createInvokerPool(args) : args.executorService;
+    methodCallsToExecutorService = args.methodCallsToExecutorService == null ? new HashMap<String, ExecutorService>() : args.methodCallsToExecutorService;
+  }
+
+  @Override
+  protected boolean requestInvoke(FrameBuffer frameBuffer) {
+    try {
+      String name;
+      try {
+        name = readMethodCall(frameBuffer);
+      } catch (TException e) {
+        LOGGER.error("Unexpected exception while invoking!", e);
+        return false;
+      }
+      Runnable invocation = getRunnable(frameBuffer);
+      ExecutorService executorService = methodCallsToExecutorService.get(name);
+      if (executorService == null) {
+        invokerStandard.execute(invocation);
+      } else {
+        executorService.execute(invocation);
+      }
+      return true;
+    } catch (RejectedExecutionException rx) {
+      LOGGER.warn("ExecutorService rejected execution!", rx);
+      return false;
+    }
+  }
+
+  private String readMethodCall(FrameBuffer frameBuffer) throws TException {
+    TTransport transport = getInputTransport(frameBuffer);
+    TProtocol inProt = inputProtocolFactory_.getProtocol(transport);
+    try {
+      TMessage tMessage = inProt.readMessageBegin();
+      return tMessage.name;
+    } finally {
+      transport.close();
+    }
+
+  }
+
+  private TTransport getInputTransport(FrameBuffer frameBuffer) {
+    try {
+      return (TTransport) method.invoke(frameBuffer, ARGS);
+    } catch (IllegalArgumentException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/thrift/MutationHelper.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/MutationHelper.java b/src/blur-core/src/main/java/org/apache/blur/thrift/MutationHelper.java
new file mode 100644
index 0000000..b6a26a0
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/MutationHelper.java
@@ -0,0 +1,69 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.List;
+
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.io.BytesWritable;
+
+
+public class MutationHelper {
+
+  public static String getShardName(String table, String rowId, int numberOfShards, BlurPartitioner<BytesWritable, ?> blurPartitioner) {
+    BytesWritable key = getKey(rowId);
+    int partition = blurPartitioner.getPartition(key, null, numberOfShards);
+    return BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, partition);
+  }
+
+  public static void validateMutation(RowMutation mutation) {
+    if (mutation == null) {
+      throw new NullPointerException("Mutation can not be null.");
+    }
+    if (mutation.rowId == null) {
+      throw new NullPointerException("Rowid can not be null in mutation.");
+    }
+    if (mutation.table == null) {
+      throw new NullPointerException("Table can not be null in mutation.");
+    }
+  }
+
+  public static BytesWritable getKey(String rowId) {
+    return new BytesWritable(rowId.getBytes());
+  }
+
+  public static Row getRowFromMutations(String id, List<RecordMutation> recordMutations) {
+    Row row = new Row().setId(id);
+    for (RecordMutation mutation : recordMutations) {
+      Record record = mutation.getRecord();
+      switch (mutation.recordMutationType) {
+      case REPLACE_ENTIRE_RECORD:
+        row.addToRecords(record);
+        break;
+      default:
+        throw new RuntimeException("Not supported [" + mutation.recordMutationType + "]");
+      }
+    }
+    return row;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java b/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
new file mode 100644
index 0000000..e8a7331
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
@@ -0,0 +1,316 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.List;
+import java.util.Map;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.clusterstatus.ClusterStatus;
+import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public abstract class TableAdmin implements Iface {
+
+  private static final Log LOG = LogFactory.getLog(TableAdmin.class);
+  protected ZooKeeper _zookeeper;
+  protected ClusterStatus _clusterStatus;
+  protected BlurConfiguration _configuration;
+
+  @Override
+  public TableStats getTableStats(String table) throws BlurException, TException {
+    return tableStats(table);
+  }
+
+  @Override
+  public boolean isInSafeMode(String cluster) throws BlurException, TException {
+    try {
+      return _clusterStatus.isInSafeMode(true, cluster);
+    } catch (Exception e) {
+      LOG.error("Unknown error during safe mode check of [cluster={0}]", e, cluster);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public final void createTable(TableDescriptor tableDescriptor) throws BlurException, TException {
+    try {
+      // @todo Remove this once issue #27 is resolved
+      if (tableDescriptor.compressionBlockSize > 32768) {
+        tableDescriptor.compressionBlockSize = 32768;
+      } else if (tableDescriptor.compressionBlockSize < 8192) {
+        tableDescriptor.compressionBlockSize = 8192;
+      }
+      _clusterStatus.createTable(tableDescriptor);
+    } catch (Exception e) {
+      LOG.error("Unknown error during create of [table={0}, tableDescriptor={1}]", e, tableDescriptor.name, tableDescriptor);
+      throw new BException(e.getMessage(), e);
+    }
+    if (tableDescriptor.isEnabled) {
+      enableTable(tableDescriptor.name);
+    }
+  }
+
+  @Override
+  public final void disableTable(String table) throws BlurException, TException {
+    try {
+      String cluster = _clusterStatus.getCluster(false, table);
+      if (cluster == null) {
+        throw new BlurException("Table [" + table + "] not found.", null);
+      }
+      _clusterStatus.disableTable(cluster, table);
+      waitForTheTableToDisable(cluster, table);
+      waitForTheTableToDisengage(cluster, table);
+    } catch (Exception e) {
+      LOG.error("Unknown error during disable of [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  private void waitForTheTableToDisengage(String cluster, String table) throws BlurException, TException {
+    // LOG.info("Waiting for shards to disengage on table [" + table + "]");
+  }
+
+  private void waitForTheTableToDisable(String cluster, String table) throws BlurException, TException {
+    LOG.info("Waiting for shards to disable on table [" + table + "]");
+    while (true) {
+      if (!_clusterStatus.isEnabled(false, cluster, table)) {
+        return;
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        LOG.error("Unknown error while enabling table [" + table + "]", e);
+        throw new BException("Unknown error while enabling table [" + table + "]", e);
+      }
+    }
+  }
+
+  @Override
+  public final void enableTable(String table) throws BlurException, TException {
+    try {
+      String cluster = _clusterStatus.getCluster(false, table);
+      if (cluster == null) {
+        throw new BlurException("Table [" + table + "] not found.", null);
+      }
+      _clusterStatus.enableTable(cluster, table);
+      waitForTheTableToEnable(cluster, table);
+      waitForTheTableToEngage(cluster, table);
+    } catch (Exception e) {
+      LOG.error("Unknown error during enable of [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  private void waitForTheTableToEnable(String cluster, String table) throws BlurException {
+    LOG.info("Waiting for shards to engage on table [" + table + "]");
+    while (true) {
+      if (_clusterStatus.isEnabled(false, cluster, table)) {
+        return;
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        LOG.error("Unknown error while enabling table [" + table + "]", e);
+        throw new BException("Unknown error while enabling table [" + table + "]", e);
+      }
+    }
+  }
+
+  private void waitForTheTableToEngage(String cluster, String table) throws BlurException, TException {
+    TableDescriptor describe = describe(table);
+    int shardCount = describe.shardCount;
+    LOG.info("Waiting for shards to engage on table [" + table + "]");
+    while (true) {
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        LOG.error("Unknown error while engaging table [" + table + "]", e);
+        throw new BException("Unknown error while engaging table [" + table + "]", e);
+      }
+      try {
+        Map<String, String> shardServerLayout = shardServerLayout(table);
+        LOG.info("Shards [" + shardServerLayout.size() + "/" + shardCount + "] of table [" + table + "] engaged");
+        if (shardServerLayout.size() == shardCount) {
+          return;
+        }
+      } catch (BlurException e) {
+        LOG.info("Stilling waiting", e);
+      } catch (TException e) {
+        LOG.info("Stilling waiting", e);
+      }
+    }
+  }
+
+  @Override
+  public final void removeTable(String table, boolean deleteIndexFiles) throws BlurException, TException {
+    try {
+      String cluster = _clusterStatus.getCluster(false, table);
+      if (cluster == null) {
+        throw new BlurException("Table [" + table + "] not found.", null);
+      }
+      _clusterStatus.removeTable(cluster, table, deleteIndexFiles);
+    } catch (Exception e) {
+      LOG.error("Unknown error during remove of [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  public boolean isTableEnabled(boolean useCache, String cluster, String table) {
+    return _clusterStatus.isEnabled(useCache, cluster, table);
+  }
+
+  public void checkTable(String table) throws BlurException {
+    if (table == null) {
+      throw new BlurException("Table cannot be null.", null);
+    }
+    String cluster = _clusterStatus.getCluster(true, table);
+    if (cluster == null) {
+      throw new BlurException("Table [" + table + "] does not exist", null);
+    }
+    checkTable(cluster, table);
+  }
+
+  public void checkTable(String cluster, String table) throws BlurException {
+    if (inSafeMode(true, table)) {
+      throw new BlurException("Cluster for [" + table + "] is in safe mode", null);
+    }
+    if (tableExists(true, cluster, table)) {
+      if (isTableEnabled(true, cluster, table)) {
+        return;
+      }
+      throw new BlurException("Table [" + table + "] exists, but is not enabled", null);
+    } else {
+      throw new BlurException("Table [" + table + "] does not exist", null);
+    }
+  }
+
+  public void checkForUpdates(String table) throws BlurException {
+    String cluster = _clusterStatus.getCluster(true, table);
+    if (cluster == null) {
+      throw new BlurException("Table [" + table + "] does not exist", null);
+    }
+    checkForUpdates(cluster, table);
+  }
+
+  public void checkForUpdates(String cluster, String table) throws BlurException {
+    if (_clusterStatus.isReadOnly(true, cluster, table)) {
+      throw new BlurException("Table [" + table + "] in cluster [" + cluster + "] is read only.", null);
+    }
+  }
+
+  @Override
+  public final List<String> controllerServerList() throws BlurException, TException {
+    try {
+      return _clusterStatus.getControllerServerList();
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get a controller list.", e);
+      throw new BException("Unknown error while trying to get a controller list.", e);
+    }
+  }
+
+  @Override
+  public final List<String> shardServerList(String cluster) throws BlurException, TException {
+    try {
+      return _clusterStatus.getShardServerList(cluster);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get a shard server list.", e);
+      throw new BException("Unknown error while trying to get a shard server list.", e);
+    }
+  }
+
+  @Override
+  public final List<String> shardClusterList() throws BlurException, TException {
+    try {
+      return _clusterStatus.getClusterList(true);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get a cluster list.", e);
+      throw new BException("Unknown error while trying to get a cluster list.", e);
+    }
+  }
+
+  @Override
+  public final TableDescriptor describe(final String table) throws BlurException, TException {
+    try {
+      String cluster = _clusterStatus.getCluster(true, table);
+      if (cluster == null) {
+        throw new BlurException("Table [" + table + "] not found.", null);
+      }
+      return _clusterStatus.getTableDescriptor(true, cluster, table);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to describe a table [" + table + "].", e);
+      throw new BException("Unknown error while trying to describe a table [" + table + "].", e);
+    }
+  }
+
+  @Override
+  public final List<String> tableListByCluster(String cluster) throws BlurException, TException {
+    try {
+      return _clusterStatus.getTableList(true, cluster);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get a table list by cluster [" + cluster + "].", e);
+      throw new BException("Unknown error while trying to get a table list by cluster [" + cluster + "].", e);
+    }
+  }
+
+  @Override
+  public final List<String> tableList() throws BlurException, TException {
+    try {
+      return _clusterStatus.getTableList(true);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get a table list.", e);
+      throw new BException("Unknown error while trying to get a table list.", e);
+    }
+  }
+
+  private boolean inSafeMode(boolean useCache, String table) throws BlurException {
+    String cluster = _clusterStatus.getCluster(useCache, table);
+    if (cluster == null) {
+      throw new BlurException("Table [" + table + "] not found.", null);
+    }
+    return _clusterStatus.isInSafeMode(useCache, cluster);
+  }
+
+  public boolean tableExists(boolean useCache, String cluster, String table) {
+    return _clusterStatus.exists(useCache, cluster, table);
+  }
+
+  public ClusterStatus getClusterStatus() {
+    return _clusterStatus;
+  }
+
+  public void setClusterStatus(ClusterStatus clusterStatus) {
+    _clusterStatus = clusterStatus;
+  }
+
+  public void setZookeeper(ZooKeeper zookeeper) {
+    _zookeeper = zookeeper;
+  }
+
+  @Override
+  public Map<String, String> configuration() throws BlurException, TException {
+    return _configuration.getProperties();
+  }
+}


Mime
View raw message