incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [46/92] [abbrv] [partial] Fixed BLUR-126.
Date Tue, 11 Jun 2013 02:41:32 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java b/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
new file mode 100644
index 0000000..6a5cf05
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
@@ -0,0 +1,447 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.SEP;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.index.IndexWriter;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.record.Utils;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.FieldType;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
+
+public class TransactionRecorder extends TimerTask implements Closeable {
+
+  enum TYPE {
+    DELETE((byte) 0), ROW((byte) 1);
+    private byte b;
+
+    private TYPE(byte b) {
+      this.b = b;
+    }
+
+    public byte value() {
+      return b;
+    }
+
+    public static TYPE lookup(byte b) {
+      switch (b) {
+      case 0:
+        return DELETE;
+      case 1:
+        return ROW;
+      default:
+        throw new RuntimeException("Type not found [" + b + "]");
+      }
+    }
+  }
+
+  private static final Log LOG = LogFactory.getLog(TransactionRecorder.class);
+  public static FieldType ID_TYPE;
+  static {
+    ID_TYPE = new FieldType();
+    ID_TYPE.setIndexed(true);
+    ID_TYPE.setTokenized(false);
+    ID_TYPE.setOmitNorms(true);
+    ID_TYPE.setStored(true);
+    ID_TYPE.freeze();
+  }
+
+  private final AtomicBoolean _running = new AtomicBoolean(true);
+  private final AtomicReference<FSDataOutputStream> _outputStream = new AtomicReference<FSDataOutputStream>();
+  private final long _timeBetweenSyncsNanos;
+  private final AtomicLong _lastSync = new AtomicLong();
+
+  private final Path _walPath;
+  private final Configuration _configuration;
+  private final BlurAnalyzer _analyzer;
+  private final FileSystem _fileSystem;
+  private final Timer _timer;
+  private final String _table;
+  private final String _shard;
+
+  public TransactionRecorder(ShardContext shardContext) throws IOException {
+    TableContext tableContext = shardContext.getTableContext();
+    _configuration = tableContext.getConfiguration();
+    _analyzer = tableContext.getAnalyzer();
+    _walPath = shardContext.getWalShardPath();
+    _fileSystem = _walPath.getFileSystem(_configuration);
+    _timeBetweenSyncsNanos = tableContext.getTimeBetweenWALSyncsNanos();
+    _timer = new Timer("wal-sync-[" + tableContext.getTable() + "/" + shardContext.getShard() + "]", true);
+    _timer.schedule(this, TimeUnit.NANOSECONDS.toMillis(_timeBetweenSyncsNanos),
+        TimeUnit.NANOSECONDS.toMillis(_timeBetweenSyncsNanos));
+    _table = tableContext.getTable();
+    _shard = shardContext.getShard();
+  }
+
+  public void open() throws IOException {
+    if (_fileSystem.exists(_walPath)) {
+      throw new IOException("WAL path [" + _walPath + "] still exists, replay must have not worked.");
+    } else {
+      _outputStream.set(_fileSystem.create(_walPath));
+    }
+    if (_outputStream == null) {
+      throw new RuntimeException();
+    }
+    _lastSync.set(System.nanoTime());
+  }
+
+  public void replay(IndexWriter writer) throws IOException {
+    if (_fileSystem.exists(_walPath)) {
+      FSDataInputStream inputStream = _fileSystem.open(_walPath);
+      replay(writer, inputStream);
+      inputStream.close();
+      commit(writer);
+    } else {
+      open();
+    }
+  }
+
+  private void replay(IndexWriter writer, DataInputStream inputStream) throws CorruptIndexException, IOException {
+    long updateCount = 0;
+    long deleteCount = 0;
+    byte[] buffer;
+    while ((buffer = readBuffer(inputStream)) != null) {
+      DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(buffer));
+      TYPE lookup = TYPE.lookup(dataInputStream.readByte());
+      switch (lookup) {
+      case ROW:
+        Row row = readRow(dataInputStream);
+        writer.updateDocuments(createRowId(row.id), getDocs(row, _analyzer));
+        updateCount++;
+        continue;
+      case DELETE:
+        String deleteRowId = readString(dataInputStream);
+        writer.deleteDocuments(createRowId(deleteRowId));
+        deleteCount++;
+        continue;
+      default:
+        LOG.error("Unknown type [{0}]", lookup);
+        throw new IOException("Unknown type [" + lookup + "]");
+      }
+    }
+    LOG.info("Rows reclaimed from the WAL [{0}]", updateCount);
+    LOG.info("Deletes reclaimed from the WAL [{0}]", deleteCount);
+  }
+
+  private byte[] readBuffer(DataInputStream inputStream) {
+    try {
+      int length = inputStream.readInt();
+      byte[] buffer = new byte[length];
+      inputStream.readFully(buffer);
+      return buffer;
+    } catch (IOException e) {
+      if (e instanceof EOFException) {
+        return null;
+      }
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  private void rollLog() throws IOException {
+    LOG.debug("Rolling WAL path [" + _walPath + "]");
+    FSDataOutputStream os = _outputStream.get();
+    if (os != null) {
+      os.close();
+    }
+    _fileSystem.delete(_walPath, false);
+    open();
+  }
+
+  public void close() throws IOException {
+    synchronized (_running) {
+      _running.set(false);
+    }
+    _timer.purge();
+    _timer.cancel();
+    _outputStream.get().close();
+  }
+
+  private static void writeRow(DataOutputStream outputStream, Row row) throws IOException {
+    writeString(outputStream, row.id);
+    List<Record> records = row.records;
+    int size = records.size();
+    outputStream.writeInt(size);
+    for (int i = 0; i < size; i++) {
+      Record record = records.get(i);
+      writeRecord(outputStream, record);
+    }
+  }
+
+  private static Row readRow(DataInputStream inputStream) throws IOException {
+    Row row = new Row();
+    row.id = readString(inputStream);
+    int size = inputStream.readInt();
+    for (int i = 0; i < size; i++) {
+      row.addToRecords(readRecord(inputStream));
+    }
+    return row;
+  }
+
+  private static void writeRecord(DataOutputStream outputStream, Record record) throws IOException {
+    writeString(outputStream, record.recordId);
+    writeString(outputStream, record.family);
+    List<Column> columns = record.columns;
+    int size = columns.size();
+    outputStream.writeInt(size);
+    for (int i = 0; i < size; i++) {
+      writeColumn(outputStream, columns.get(i));
+    }
+  }
+
+  private static Record readRecord(DataInputStream inputStream) throws IOException {
+    Record record = new Record();
+    record.recordId = readString(inputStream);
+    record.family = readString(inputStream);
+    int size = inputStream.readInt();
+    for (int i = 0; i < size; i++) {
+      record.addToColumns(readColumn(inputStream));
+    }
+    return record;
+  }
+
+  private static void writeColumn(DataOutputStream outputStream, Column column) throws IOException {
+    writeString(outputStream, column.name);
+    writeString(outputStream, column.value);
+  }
+
+  private static Column readColumn(DataInputStream inputStream) throws IOException {
+    Column column = new Column();
+    column.name = readString(inputStream);
+    column.value = readString(inputStream);
+    return column;
+  }
+
+  private static void writeDelete(DataOutputStream outputStream, String deleteRowId) throws IOException {
+    writeString(outputStream, deleteRowId);
+  }
+
+  private static void writeString(DataOutputStream outputStream, String s) throws IOException {
+    byte[] bs = s.getBytes();
+    Utils.writeVInt(outputStream, bs.length);
+    outputStream.write(bs);
+  }
+
+  private static String readString(DataInputStream inputStream) throws IOException {
+    int length = Utils.readVInt(inputStream);
+    byte[] buffer = new byte[length];
+    inputStream.readFully(buffer);
+    return new String(buffer);
+  }
+
+  private void sync(byte[] bs) throws IOException {
+    if (bs == null || _outputStream == null) {
+      throw new RuntimeException("bs [" + bs + "] outputStream [" + _outputStream + "]");
+    }
+    synchronized (_running) {
+      FSDataOutputStream os = _outputStream.get();
+      os.writeInt(bs.length);
+      os.write(bs);
+      tryToSync(os);
+    }
+  }
+
+  private void tryToSync() throws IOException {
+    synchronized (_running) {
+      tryToSync(_outputStream.get());
+    }
+  }
+
+  private void tryToSync(FSDataOutputStream os) throws IOException {
+    if (os == null) {
+      return;
+    }
+    long now = System.nanoTime();
+    if (_lastSync.get() + _timeBetweenSyncsNanos < now) {
+      os.sync();
+      _lastSync.set(now);
+    }
+  }
+
+  public long replaceRow(boolean wal, Row row, TrackingIndexWriter writer) throws IOException {
+    if (wal) {
+      synchronized (_running) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream outputStream = new DataOutputStream(baos);
+        outputStream.writeByte(TYPE.ROW.value());
+        writeRow(outputStream, row);
+        outputStream.close();
+        sync(baos.toByteArray());
+      }
+    }
+    Term term = createRowId(row.id);
+    List<Document> docs = getDocs(row, _analyzer);
+    return writer.updateDocuments(term, docs);
+  }
+
+  public long deleteRow(boolean wal, String rowId, TrackingIndexWriter writer) throws IOException {
+    if (wal) {
+      synchronized (_running) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream outputStream = new DataOutputStream(baos);
+        outputStream.writeByte(TYPE.DELETE.value());
+        writeDelete(outputStream, rowId);
+        outputStream.close();
+        sync(baos.toByteArray());
+      }
+    }
+    return writer.deleteDocuments(createRowId(rowId));
+  }
+
+  public void commit(IndexWriter writer) throws CorruptIndexException, IOException {
+    synchronized (_running) {
+      long s = System.nanoTime();
+      writer.commit();
+      long m = System.nanoTime();
+      LOG.debug("Commit took [{0} ms] for [{1}/{2}]", (m - s) / 1000000.0, _table, _shard);
+      rollLog();
+      long e = System.nanoTime();
+      LOG.debug("Log roller took [{0} ms] for [{1}/{2}]", (e - m) / 1000000.0, _table, _shard);
+    }
+  }
+
+  public static List<Document> getDocs(Row row, BlurAnalyzer analyzer) {
+    List<Record> records = row.records;
+    int size = records.size();
+    final String rowId = row.id;
+    final StringBuilder builder = new StringBuilder();
+    List<Document> docs = new ArrayList<Document>(size);
+    for (int i = 0; i < size; i++) {
+      Document document = convert(rowId, records.get(i), builder, analyzer);
+      if (i == 0) {
+        document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+      }
+      docs.add(document);
+    }
+    return docs;
+  }
+
+  public static Document convert(String rowId, Record record, StringBuilder builder, BlurAnalyzer analyzer) {
+    BlurUtil.validateRowIdAndRecord(rowId, record);
+    Document document = new Document();
+    document.add(new Field(BlurConstants.ROW_ID, rowId, ID_TYPE));
+    document.add(new Field(BlurConstants.RECORD_ID, record.recordId, ID_TYPE));
+    document.add(new Field(BlurConstants.FAMILY, record.family, ID_TYPE));
+    addColumns(document, analyzer, builder, record.family, record.columns);
+    return document;
+  }
+
+  private Term createRowId(String id) {
+    return new Term(BlurConstants.ROW_ID, id);
+  }
+
+  @Override
+  public void run() {
+    try {
+      if (_running.get()) {
+        tryToSync();
+      }
+    } catch (IOException e) {
+      if (_running.get()) {
+        if (e.getMessage().equals("DFSOutputStream is closed")) {
+          LOG.warn("Trying to sync the outputstrema and the stream has been closed.  This is probably a test and the filesystem has been closed.");
+          try {
+            Thread.sleep(TimeUnit.SECONDS.toMillis(5));
+          } catch (InterruptedException ex) {
+            return;
+          }
+        } else {
+          LOG.error("Known error while trying to sync.", e);
+        }
+      }
+    }
+  }
+  
+  public static boolean addColumns(Document document, BlurAnalyzer analyzer, StringBuilder builder,
+      String columnFamily, Iterable<Column> set) {
+    if (set == null) {
+      return false;
+    }
+    builder.setLength(0);
+    OUTER: for (Column column : set) {
+      String name = column.getName();
+      String value = column.value;
+      if (value == null || name == null) {
+        continue OUTER;
+      }
+      String fieldName = getFieldName(columnFamily, name);
+      FieldType fieldType = analyzer.getFieldType(fieldName);
+      Field field = analyzer.getField(fieldName, value, fieldType);
+      document.add(field);
+      
+      // @TODO remove full text stuff
+//      if (analyzer.isFullTextField(fieldName)) {
+//        builder.append(value).append(' ');
+//      }
+      Set<String> subFieldNames = analyzer.getSubIndexNames(fieldName);
+      if (subFieldNames != null) {
+        for (String subFieldName : subFieldNames) {
+          FieldType subFieldType = analyzer.getFieldType(subFieldName);
+          document.add(analyzer.getField(subFieldName, value, subFieldType));
+        }
+      }
+    }
+//    if (builder.length() != 0) {
+//      String superValue = builder.toString();
+//      document.add(new Field(SUPER, superValue, Store.NO, Index.ANALYZED_NO_NORMS));
+//    }
+    return true;
+  }
+
+  public static String getFieldName(String columnFamily, String name) {
+    return columnFamily + SEP + name;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/server/Configurable.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/Configurable.java b/blur-core/src/main/java/org/apache/blur/server/Configurable.java
new file mode 100644
index 0000000..01b9268
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/server/Configurable.java
@@ -0,0 +1,25 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public interface Configurable {
+
+  public void setTableContext(TableContext context);
+
+  public TableContext getTableContext();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/server/IndexSearcherClosable.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/IndexSearcherClosable.java b/blur-core/src/main/java/org/apache/blur/server/IndexSearcherClosable.java
new file mode 100644
index 0000000..64574d8
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/server/IndexSearcherClosable.java
@@ -0,0 +1,34 @@
+package org.apache.blur.server;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.NRTManager;
+import org.apache.lucene.store.Directory;
+
+public class IndexSearcherClosable extends IndexSearcher implements Closeable {
+
+  private final AtomicReference<NRTManager> _nrtManagerRef;
+  private final Directory _directory;
+
+  public IndexSearcherClosable(IndexReader r, ExecutorService executor, AtomicReference<NRTManager> nrtManagerRef,
+      Directory directory) {
+    super(r, executor);
+    _nrtManagerRef = nrtManagerRef;
+    _directory = directory;
+  }
+
+  public Directory getDirectory() {
+    return _directory;
+  }
+
+  @Override
+  public void close() throws IOException {
+    _nrtManagerRef.get().release(this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/server/ShardContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/ShardContext.java b/blur-core/src/main/java/org/apache/blur/server/ShardContext.java
new file mode 100644
index 0000000..69e3786
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/server/ShardContext.java
@@ -0,0 +1,89 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.Directory;
+
+public class ShardContext {
+
+  private String shard;
+  private Path walShardPath;
+  private Path hdfsDirPath;
+  private Directory directory;
+  private TableContext tableContext;
+
+  public TableContext getTableContext() {
+    return tableContext;
+  }
+
+  public void setTableContext(TableContext tableContext) {
+    this.tableContext = tableContext;
+  }
+
+  protected ShardContext() {
+
+  }
+
+  public Directory getDirectory() {
+    return directory;
+  }
+
+  public void setDirectory(Directory directory) {
+    this.directory = directory;
+  }
+
+  public Path getHdfsDirPath() {
+    return hdfsDirPath;
+  }
+
+  public void setHdfsDirPath(Path hdfsDirPath) {
+    this.hdfsDirPath = hdfsDirPath;
+  }
+
+  public String getShard() {
+    return shard;
+  }
+
+  public void setShard(String shard) {
+    this.shard = shard;
+  }
+
+  public Path getWalShardPath() {
+    return walShardPath;
+  }
+
+  public void setWalShardPath(Path walShardPath) {
+    this.walShardPath = walShardPath;
+  }
+
+  public static ShardContext create(TableContext tableContext, String shard) throws IOException {
+    BlurUtil.validateShardName(shard);
+    ShardContext shardContext = new ShardContext();
+    shardContext.tableContext = tableContext;
+    shardContext.walShardPath = new Path(tableContext.getWalTablePath(), shard);
+    shardContext.hdfsDirPath = new Path(tableContext.getTablePath(), shard);
+    shardContext.shard = shard;
+    shardContext.directory = new HdfsDirectory(tableContext.getConfiguration(), shardContext.hdfsDirPath);
+    return shardContext;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java b/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java
new file mode 100644
index 0000000..44a1413
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java
@@ -0,0 +1,133 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thirdparty.thrift_0_9_0.server.ServerContext;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * The thrift session that will hold index reader references to maintain across
+ * query and fetch calls. Since there is a fixed size thread pool issuing calls
+ * that involve the _threadsToContext map where Thread is the key we don't need
+ * to clear or reset threads.
+ */
+public class ShardServerContext implements ServerContext {
+
+  private static final Log LOG = LogFactory.getLog(ShardServerContext.class);
+
+  private final static Map<Thread, ShardServerContext> _threadsToContext = new ConcurrentHashMap<Thread, ShardServerContext>();
+  private final Map<String, IndexSearcherClosable> _indexSearcherMap = new HashMap<String, IndexSearcherClosable>();
+
+  /**
+   * Registers the {@link ShardServerContext} for this thread.
+   * 
+   * @param context
+   *          the {@link ShardServerContext}.
+   */
+  public static void registerContextForCall(ShardServerContext context) {
+    _threadsToContext.put(Thread.currentThread(), context);
+  }
+
+  /**
+   * Gets the {@link ShardServerContext} for this {@link Thread}.
+   * 
+   * @return the {@link ShardServerContext}.
+   */
+  public static ShardServerContext getShardServerContext() {
+    return _threadsToContext.get(Thread.currentThread());
+  }
+
+  /**
+   * Resets the context, this closes and releases the index readers.
+   */
+  public static void resetSearchers() {
+    ShardServerContext shardServerContext = getShardServerContext();
+    if (shardServerContext != null) {
+      shardServerContext.reset();
+    }
+  }
+
+  /**
+   * Closes this context, this happens when the client closes it's connect to
+   * the server.
+   */
+  public void close() {
+    reset();
+  }
+
+  /**
+   * Resets the {@link ShardServerContext} by closing the searchers.
+   */
+  public void reset() {
+    Collection<IndexSearcherClosable> values = _indexSearcherMap.values();
+    for (IndexSearcherClosable indexSearcherClosable : values) {
+      LOG.info("Closing [{0}]", indexSearcherClosable);
+      IOUtils.cleanup(LOG, indexSearcherClosable);
+    }
+    _indexSearcherMap.clear();
+  }
+
+  /**
+   * Gets the cached {@link IndexSearcherClosable} (if any) for the given table
+   * and shard.
+   * 
+   * @param table
+   *          the stable name.
+   * @param shard
+   *          the shard name.
+   * @return the {@link IndexSearcherClosable} or null if not present.
+   */
+  public IndexSearcherClosable getIndexSearcherClosable(String table, String shard) {
+    IndexSearcherClosable indexSearcherClosable = _indexSearcherMap.get(getKey(table, shard));
+    if (indexSearcherClosable != null) {
+      LOG.info("Using cached searcher [{0}] for table [{1}] shard [{2}]", indexSearcherClosable, table, shard);
+    }
+    return indexSearcherClosable;
+  }
+
+  /**
+   * Sets the index searcher for this {@link ShardServerContext} for the given
+   * table and shard.
+   * 
+   * @param table
+   *          the table name.
+   * @param shard
+   *          the shard name.
+   * @param searcher
+   *          the {@link IndexSearcherClosable}.
+   * @throws IOException
+   */
+  public void setIndexSearcherClosable(String table, String shard, IndexSearcherClosable searcher) throws IOException {
+    IndexSearcherClosable indexSearcherClosable = _indexSearcherMap.put(getKey(table, shard), searcher);
+    if (indexSearcherClosable != null && searcher != indexSearcherClosable) {
+      indexSearcherClosable.close();
+    }
+  }
+
+  private String getKey(String table, String shard) {
+    return table + "/" + shard;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java b/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java
new file mode 100644
index 0000000..d8eb7cb
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java
@@ -0,0 +1,59 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.server.ServerContext;
+import org.apache.blur.thirdparty.thrift_0_9_0.server.TServerEventHandler;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransport;
+
+/**
+ * {@link ShardServerContext} is the session manager for the shard servers. It
+ * allows for reader reuse across method calls.
+ */
+public class ShardServerEventHandler implements TServerEventHandler {
+
+  private static final Log LOG = LogFactory.getLog(ShardServerEventHandler.class);
+
+  @Override
+  public void preServe() {
+    LOG.debug("preServe");
+  }
+
+  @Override
+  public ServerContext createContext(TProtocol input, TProtocol output) {
+    LOG.debug("Client connected");
+    return new ShardServerContext();
+  }
+
+  @Override
+  public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
+    LOG.debug("Client disconnected");
+    ShardServerContext context = (ShardServerContext) serverContext;
+    context.close();
+  }
+
+  @Override
+  public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {
+    LOG.debug("Method called");
+    ShardServerContext context = (ShardServerContext) serverContext;
+    ShardServerContext.registerContextForCall(context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
new file mode 100644
index 0000000..1427f1d
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -0,0 +1,173 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.BLUR_SAHRD_INDEX_SIMILARITY;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS;
+import static org.apache.blur.utils.BlurConstants.SUPER;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.ScoreType;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.similarities.DefaultSimilarity;
+import org.apache.lucene.search.similarities.Similarity;
+
+public class TableContext {
+
+
+  private static final Log LOG = LogFactory.getLog(TableContext.class);
+
+  private static final String LOGS = "logs";
+
+  private Path tablePath;
+  private Path walTablePath;
+  private BlurAnalyzer analyzer;
+  private String defaultFieldName;
+  private String table;
+  private IndexDeletionPolicy indexDeletionPolicy;
+  private Similarity similarity;
+  private Configuration configuration;
+  private TableDescriptor descriptor;
+  private long timeBetweenCommits;
+  private long timeBetweenRefreshs;
+  private ScoreType defaultScoreType;
+  private Term defaultPrimeDocTerm;
+
+  private static ConcurrentHashMap<String, TableContext> cache = new ConcurrentHashMap<String, TableContext>();
+
+  protected TableContext() {
+
+  }
+  
+  public static void clear() {
+    cache.clear();
+  }
+
+  public static TableContext create(TableDescriptor tableDescriptor) {
+    TableContext tableContext = cache.get(tableDescriptor.getName());
+    if (tableContext != null) {
+      return tableContext;
+    }
+    LOG.info("Creating table context for table [{0}]", tableDescriptor.getName());
+    Configuration configuration = new Configuration();
+    Map<String, String> tableProperties = tableDescriptor.getTableProperties();
+    if (tableProperties != null) {
+      for (Entry<String, String> prop : tableProperties.entrySet()) {
+        configuration.set(prop.getKey(), prop.getValue());
+      }
+    }
+
+    tableContext = new TableContext();
+    tableContext.configuration = configuration;
+    tableContext.tablePath = new Path(tableDescriptor.getTableUri());
+    tableContext.walTablePath = new Path(tableContext.tablePath, LOGS);
+    tableContext.analyzer = new BlurAnalyzer(tableDescriptor.getAnalyzerDefinition());
+    tableContext.defaultFieldName = SUPER;
+    tableContext.table = tableDescriptor.getName();
+    tableContext.descriptor = tableDescriptor;
+    tableContext.timeBetweenCommits = configuration.getLong(BLUR_SHARD_TIME_BETWEEN_COMMITS, 60000);
+    tableContext.timeBetweenRefreshs = configuration.getLong(BLUR_SHARD_TIME_BETWEEN_REFRESHS, 5000);
+    tableContext.defaultPrimeDocTerm = new Term("_prime_", "true");
+    tableContext.defaultScoreType = ScoreType.SUPER;
+
+    Class<?> c1 = configuration.getClass(BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE, KeepOnlyLastCommitDeletionPolicy.class);
+    tableContext.indexDeletionPolicy = (IndexDeletionPolicy) configure(ReflectionUtils.newInstance(c1, configuration), tableContext);
+    Class<?> c2 = configuration.getClass(BLUR_SAHRD_INDEX_SIMILARITY, DefaultSimilarity.class);
+    tableContext.similarity = (Similarity) configure(ReflectionUtils.newInstance(c2, configuration), tableContext);
+    
+    cache.put(tableDescriptor.getName(), tableContext);
+    return tableContext;
+  }
+
+  private static Object configure(Object o, TableContext tableContext) {
+    if (o instanceof Configurable) {
+      ((Configurable) o).setTableContext(tableContext);
+    }
+    return o;
+  }
+
+  public IndexDeletionPolicy getIndexDeletionPolicy() {
+    return indexDeletionPolicy;
+  }
+
+  public Similarity getSimilarity() {
+    return similarity;
+  }
+
+  public long getTimeBetweenCommits() {
+    return timeBetweenCommits;
+  }
+
+  public long getTimeBetweenRefreshs() {
+    return timeBetweenRefreshs;
+  }
+
+  public BlurAnalyzer getAnalyzer() {
+    return analyzer;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  public TableDescriptor getDescriptor() {
+    return descriptor;
+  }
+
+  public Path getTablePath() {
+    return tablePath;
+  }
+
+  public Path getWalTablePath() {
+    return walTablePath;
+  }
+
+  public String getDefaultFieldName() {
+    return defaultFieldName;
+  }
+
+  public Term getDefaultPrimeDocTerm() {
+    return defaultPrimeDocTerm;
+  }
+
+  public ScoreType getDefaultScoreType() {
+    return defaultScoreType;
+  }
+
+  public long getTimeBetweenWALSyncsNanos() {
+    return TimeUnit.MILLISECONDS.toNanos(10);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
new file mode 100644
index 0000000..20837b3
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -0,0 +1,926 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.manager.BlurQueryChecker;
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
+import org.apache.blur.manager.indexserver.DistributedLayoutManager;
+import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.manager.results.BlurResultIterableClient;
+import org.apache.blur.manager.results.LazyBlurResult;
+import org.apache.blur.manager.results.MergerBlurResultIterable;
+import org.apache.blur.manager.stats.MergerTableStats;
+import org.apache.blur.manager.status.MergerQueryStatus;
+import org.apache.blur.manager.status.MergerQueryStatusSingle;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.commands.BlurCommand;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.thrift.generated.BlurResults;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.Schema;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.ShardState;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurExecutorCompletionService;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.ForkJoin;
+import org.apache.blur.utils.ForkJoin.Merger;
+import org.apache.blur.utils.ForkJoin.ParallelCall;
+import org.apache.blur.zookeeper.WatchChildren;
+import org.apache.blur.zookeeper.WatchChildren.OnChange;
+import org.apache.blur.zookeeper.WatchNodeExistance;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class BlurControllerServer extends TableAdmin implements Iface {
+
+  public static abstract class BlurClient {
+    public abstract <T> T execute(String node, BlurCommand<T> command, int maxRetries, long backOffTime,
+        long maxBackOffTime) throws BlurException, TException, IOException;
+  }
+
+  public static class BlurClientRemote extends BlurClient {
+    @Override
+    public <T> T execute(String node, BlurCommand<T> command, int maxRetries, long backOffTime, long maxBackOffTime)
+        throws BlurException, TException, IOException {
+      return BlurClientManager.execute(node, command, maxRetries, backOffTime, maxBackOffTime);
+    }
+  }
+
+  private static final String CONTROLLER_THREAD_POOL = "controller-thread-pool";
+  private static final Log LOG = LogFactory.getLog(BlurControllerServer.class);
+  private static final Map<String, Set<String>> EMPTY_MAP = new HashMap<String, Set<String>>();
+  private static final Set<String> EMPTY_SET = new HashSet<String>();
+
+  private ExecutorService _executor;
+  private AtomicReference<Map<String, Map<String, String>>> _shardServerLayout = new AtomicReference<Map<String, Map<String, String>>>(
+      new HashMap<String, Map<String, String>>());
+  private BlurClient _client;
+  private int _threadCount = 64;
+  private AtomicBoolean _closed = new AtomicBoolean();
+  private Map<String, Integer> _tableShardCountMap = new ConcurrentHashMap<String, Integer>();
+  private BlurPartitioner _blurPartitioner = new BlurPartitioner();
+  private String _nodeName;
+  private int _remoteFetchCount = 100;
+  private long _maxTimeToLive = TimeUnit.MINUTES.toMillis(1);
+  private int _maxQueryCacheElements = 128;
+  private BlurQueryChecker _queryChecker;
+  private AtomicBoolean _running = new AtomicBoolean();
+
+  private int _maxFetchRetries = 3;
+  private int _maxMutateRetries = 3;
+  private int _maxDefaultRetries = 3;
+  private long _fetchDelay = 500;
+  private long _mutateDelay = 500;
+  private long _defaultDelay = 500;
+  private long _maxFetchDelay = 2000;
+  private long _maxMutateDelay = 2000;
+  private long _maxDefaultDelay = 2000;
+
+  private long _defaultParallelCallTimeout = TimeUnit.MINUTES.toMillis(1);
+  private WatchChildren _watchForClusters;
+  private ConcurrentMap<String, WatchNodeExistance> _watchForTablesPerClusterExistance = new ConcurrentHashMap<String, WatchNodeExistance>();
+  private ConcurrentMap<String, WatchNodeExistance> _watchForOnlineShardsPerClusterExistance = new ConcurrentHashMap<String, WatchNodeExistance>();
+  private ConcurrentMap<String, WatchChildren> _watchForTablesPerCluster = new ConcurrentHashMap<String, WatchChildren>();
+  private ConcurrentMap<String, WatchChildren> _watchForOnlineShardsPerCluster = new ConcurrentHashMap<String, WatchChildren>();
+
+  public void init() throws KeeperException, InterruptedException {
+    setupZookeeper();
+    registerMyself();
+    _executor = Executors.newThreadPool(CONTROLLER_THREAD_POOL, _threadCount);
+    _running.set(true);
+    watchForClusterChanges();
+    List<String> clusterList = _clusterStatus.getClusterList(false);
+    for (String cluster : clusterList) {
+      watchForLayoutChanges(cluster);
+    }
+    updateLayout();
+  }
+
+  private void setupZookeeper() throws KeeperException, InterruptedException {
+    BlurUtil.createIfMissing(_zookeeper, "/blur");
+    BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getOnlineControllersPath());
+    BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getClustersPath());
+  }
+
+  private void watchForClusterChanges() throws KeeperException, InterruptedException {
+    _watchForClusters = new WatchChildren(_zookeeper, ZookeeperPathConstants.getClustersPath());
+    _watchForClusters.watch(new OnChange() {
+      @Override
+      public void action(List<String> children) {
+        for (String cluster : children) {
+          try {
+            watchForLayoutChanges(cluster);
+          } catch (KeeperException e) {
+            LOG.error("Unknown error", e);
+            throw new RuntimeException(e);
+          } catch (InterruptedException e) {
+            LOG.error("Unknown error", e);
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    });
+  }
+
+  private void watchForLayoutChanges(final String cluster) throws KeeperException, InterruptedException {
+    WatchNodeExistance we1 = new WatchNodeExistance(_zookeeper, ZookeeperPathConstants.getTablesPath(cluster));
+    we1.watch(new WatchNodeExistance.OnChange() {
+      @Override
+      public void action(Stat stat) {
+        if (stat != null) {
+          watch(cluster, ZookeeperPathConstants.getTablesPath(cluster), _watchForTablesPerCluster);
+        }
+      }
+    });
+    if (_watchForTablesPerClusterExistance.putIfAbsent(cluster, we1) != null) {
+      we1.close();
+    }
+
+    WatchNodeExistance we2 = new WatchNodeExistance(_zookeeper, ZookeeperPathConstants.getTablesPath(cluster));
+    we2.watch(new WatchNodeExistance.OnChange() {
+      @Override
+      public void action(Stat stat) {
+        if (stat != null) {
+          watch(cluster, ZookeeperPathConstants.getOnlineShardsPath(cluster), _watchForOnlineShardsPerCluster);
+        }
+      }
+    });
+    if (_watchForOnlineShardsPerClusterExistance.putIfAbsent(cluster, we2) != null) {
+      we2.close();
+    }
+  }
+
+  private void watch(String cluster, String path, ConcurrentMap<String, WatchChildren> map) {
+    WatchChildren watchForTables = new WatchChildren(_zookeeper, path);
+    watchForTables.watch(new OnChange() {
+      @Override
+      public void action(List<String> children) {
+        LOG.info("Layout change.");
+        updateLayout();
+      }
+    });
+
+    if (map.putIfAbsent(cluster, watchForTables) != null) {
+      watchForTables.close();
+    }
+  }
+
+  private synchronized void updateLayout() {
+    if (!_clusterStatus.isOpen()) {
+      LOG.warn("The cluster status object has been closed.");
+      return;
+    }
+    List<String> tableList = _clusterStatus.getTableList(false);
+    HashMap<String, Map<String, String>> newLayout = new HashMap<String, Map<String, String>>();
+    for (String table : tableList) {
+      DistributedLayoutManager layoutManager = new DistributedLayoutManager();
+      String cluster = _clusterStatus.getCluster(false, table);
+      if (cluster == null) {
+        continue;
+      }
+      List<String> shardServerList = _clusterStatus.getShardServerList(cluster);
+      List<String> offlineShardServers = _clusterStatus.getOfflineShardServers(false, cluster);
+      List<String> shardList = getShardList(cluster, table);
+      layoutManager.setNodes(shardServerList);
+      layoutManager.setNodesOffline(offlineShardServers);
+      layoutManager.setShards(shardList);
+      layoutManager.init();
+      Map<String, String> layout = layoutManager.getLayout();
+      newLayout.put(table, layout);
+    }
+    _shardServerLayout.set(newLayout);
+  }
+
+  private List<String> getShardList(String cluster, String table) {
+    List<String> shards = new ArrayList<String>();
+    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
+    for (int i = 0; i < tableDescriptor.shardCount; i++) {
+      shards.add(BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i));
+    }
+    return shards;
+  }
+
+  private void registerMyself() {
+    try {
+      String onlineControllerPath = ZookeeperPathConstants.getOnlineControllersPath() + "/" + _nodeName;
+      while (_zookeeper.exists(onlineControllerPath, false) != null) {
+        LOG.info("Node [{0}] already registered, waiting for path [{1}] to be released", _nodeName,
+            onlineControllerPath);
+        Thread.sleep(3000);
+      }
+      String version = BlurUtil.getVersion();
+      _zookeeper.create(onlineControllerPath, version.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public synchronized void close() {
+    if (!_closed.get()) {
+      _closed.set(true);
+      _running.set(false);
+      _executor.shutdownNow();
+      close(_watchForClusters);
+      close(_watchForOnlineShardsPerCluster.values());
+      close(_watchForOnlineShardsPerClusterExistance.values());
+      close(_watchForTablesPerCluster.values());
+      close(_watchForTablesPerClusterExistance.values());
+    }
+  }
+
+  private void close(Collection<? extends Closeable> closableLst) {
+    for (Closeable closeable : closableLst) {
+      close(closeable);
+    }
+  }
+
+  private void close(Closeable closeable) {
+    try {
+      closeable.close();
+    } catch (IOException e) {
+      LOG.error("Unknown", e);
+    }
+  }
+
+  @Override
+  public BlurResults query(final String table, final BlurQuery blurQuery) throws BlurException, TException {
+    checkTable(table);
+    String cluster = _clusterStatus.getCluster(true, table);
+    _queryChecker.checkQuery(blurQuery);
+    int shardCount = _clusterStatus.getShardCount(true, cluster, table);
+
+    OUTER: for (int retries = 0; retries < _maxDefaultRetries; retries++) {
+      try {
+        final AtomicLongArray facetCounts = BlurUtil.getAtomicLongArraySameLengthAsList(blurQuery.facets);
+
+        BlurQuery original = new BlurQuery(blurQuery);
+
+        BlurUtil.setStartTime(original);
+
+        Selector selector = blurQuery.getSelector();
+        if (selector == null) {
+          selector = new Selector();
+          selector.setColumnFamiliesToFetch(EMPTY_SET);
+          selector.setColumnsToFetch(EMPTY_MAP);
+          if (!blurQuery.simpleQuery.superQueryOn) {
+            selector.setRecordOnly(true);
+          }
+        }
+        blurQuery.setSelector(null);
+
+        BlurCommand<BlurResultIterable> command = new BlurCommand<BlurResultIterable>() {
+          @Override
+          public BlurResultIterable call(Client client, Connection connection) throws BlurException, TException {
+            return new BlurResultIterableClient(connection, client, table, blurQuery, facetCounts, _remoteFetchCount);
+          }
+
+          @Override
+          public BlurResultIterable call(Client client) throws BlurException, TException {
+            throw new RuntimeException("Won't be called.");
+          }
+        };
+
+        command.setDetachClient(true);
+
+        MergerBlurResultIterable merger = new MergerBlurResultIterable(blurQuery);
+        BlurResultIterable hitsIterable = null;
+        try {
+          hitsIterable = scatterGather(getCluster(table), command, merger);
+          BlurResults results = convertToBlurResults(hitsIterable, blurQuery, facetCounts, _executor, selector, table);
+          if (!validResults(results, shardCount, blurQuery)) {
+            BlurClientManager.sleep(_defaultDelay, _maxDefaultDelay, retries, _maxDefaultRetries);
+            continue OUTER;
+          }
+          return results;
+        } finally {
+          if (hitsIterable != null) {
+            hitsIterable.close();
+          }
+        }
+      } catch (Exception e) {
+        LOG.error("Unknown error during search of [table={0},blurQuery={1}]", e, table, blurQuery);
+        throw new BException("Unknown error during search of [table={0},blurQuery={1}]", e, table, blurQuery);
+      }
+    }
+    throw new BlurException("Query could not be completed.", null);
+  }
+
+  public BlurResults convertToBlurResults(BlurResultIterable hitsIterable, BlurQuery query,
+      AtomicLongArray facetCounts, ExecutorService executor, Selector selector, final String table)
+      throws InterruptedException, ExecutionException {
+    BlurResults results = new BlurResults();
+    results.setTotalResults(hitsIterable.getTotalResults());
+    results.setShardInfo(hitsIterable.getShardInfo());
+    if (query.minimumNumberOfResults > 0) {
+      hitsIterable.skipTo(query.start);
+      int count = 0;
+      Iterator<BlurResult> iterator = hitsIterable.iterator();
+      while (iterator.hasNext() && count < query.fetch) {
+        results.addToResults(iterator.next());
+        count++;
+      }
+    }
+    if (results.results == null) {
+      results.results = new ArrayList<BlurResult>();
+    }
+    if (facetCounts != null) {
+      results.facetCounts = BlurUtil.toList(facetCounts);
+    }
+    if (selector != null) {
+      List<Future<FetchResult>> futures = new ArrayList<Future<FetchResult>>();
+      for (int i = 0; i < results.results.size(); i++) {
+        final LazyBlurResult result = (LazyBlurResult) results.results.get(i);
+        final Selector s = new Selector(selector);
+        s.setLocationId(result.locationId);
+        futures.add(executor.submit(new Callable<FetchResult>() {
+          @Override
+          public FetchResult call() throws Exception {
+            return result.fetchRow(table, s);
+          }
+        }));
+      }
+      for (int i = 0; i < results.results.size(); i++) {
+        Future<FetchResult> future = futures.get(i);
+        BlurResult result = results.results.get(i);
+        result.setFetchResult(future.get());
+        result.setLocationId(null);
+      }
+    }
+    results.query = query;
+    results.query.selector = selector;
+    return results;
+  }
+
+  private boolean validResults(BlurResults results, int shardCount, BlurQuery query) {
+    if (results.totalResults >= query.minimumNumberOfResults) {
+      return true;
+    }
+    int shardInfoSize = results.getShardInfoSize();
+    if (shardInfoSize == shardCount) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public FetchResult fetchRow(final String table, final Selector selector) throws BlurException, TException {
+    checkTable(table);
+    IndexManager.validSelector(selector);
+    String clientHostnamePort = null;
+    try {
+      clientHostnamePort = getNode(table, selector);
+      return _client.execute(clientHostnamePort, new BlurCommand<FetchResult>() {
+        @Override
+        public FetchResult call(Client client) throws BlurException, TException {
+          return client.fetchRow(table, selector);
+        }
+      }, _maxFetchRetries, _fetchDelay, _maxFetchDelay);
+    } catch (Exception e) {
+      LOG.error("Unknown error during fetch of row from table [{0}] selector [{1}] node [{2}]", e, table, selector,
+          clientHostnamePort);
+      throw new BException("Unknown error during fetch of row from table [{0}] selector [{1}] node [{2}]", e, table,
+          selector, clientHostnamePort);
+    }
+  }
+
+  @Override
+  public void cancelQuery(final String table, final long uuid) throws BlurException, TException {
+    checkTable(table);
+    try {
+      scatter(getCluster(table), new BlurCommand<Void>() {
+        @Override
+        public Void call(Client client) throws BlurException, TException {
+          client.cancelQuery(table, uuid);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to cancel search table [{0}] uuid [{1}]", e, table, uuid);
+      throw new BException("Unknown error while trying to cancel search table [{0}] uuid [{1}]", e, table, uuid);
+    }
+  }
+
+  @Override
+  public List<BlurQueryStatus> currentQueries(final String table) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<List<BlurQueryStatus>>() {
+        @Override
+        public List<BlurQueryStatus> call(Client client) throws BlurException, TException {
+          return client.currentQueries(table);
+        }
+      }, new MergerQueryStatus(_defaultParallelCallTimeout));
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get current searches [{0}]", e, table);
+      throw new BException("Unknown error while trying to get current searches [{0}]", e, table);
+    }
+  }
+
+  @Override
+  public List<Long> queryStatusIdList(final String table) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<List<Long>>() {
+        @Override
+        public List<Long> call(Client client) throws BlurException, TException {
+          return client.queryStatusIdList(table);
+        }
+      }, new Merger<List<Long>>() {
+        @Override
+        public List<Long> merge(BlurExecutorCompletionService<List<Long>> service) throws BlurException {
+          Set<Long> result = new HashSet<Long>();
+          while (service.getRemainingCount() > 0) {
+            Future<List<Long>> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true);
+            List<Long> ids = service.getResultThrowException(future);
+            result.addAll(ids);
+          }
+          return new ArrayList<Long>(result);
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get query status ids for table [{0}]", e, table);
+      throw new BException("Unknown error while trying to get query status ids for table [{0}]", e, table);
+    }
+  }
+
+  @Override
+  public BlurQueryStatus queryStatusById(final String table, final long uuid) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<BlurQueryStatus>() {
+        @Override
+        public BlurQueryStatus call(Client client) throws BlurException, TException {
+          return client.queryStatusById(table, uuid);
+        }
+      }, new MergerQueryStatusSingle(_defaultParallelCallTimeout));
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get query status [{0}]", e, table, uuid);
+      throw new BException("Unknown error while trying to get query status [{0}]", e, table, uuid);
+    }
+  }
+
+  @Override
+  public TableStats tableStats(final String table) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<TableStats>() {
+        @Override
+        public TableStats call(Client client) throws BlurException, TException {
+          return client.getTableStats(table);
+        }
+      }, new MergerTableStats(_defaultParallelCallTimeout));
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get table stats [{0}]", e, table);
+      throw new BException("Unknown error while trying to get table stats [{0}]", e, table);
+    }
+  }
+
+  @Override
+  public Map<String, String> shardServerLayout(String table) throws BlurException, TException {
+    checkTable(table);
+    Map<String, Map<String, String>> layout = _shardServerLayout.get();
+    Map<String, String> tableLayout = layout.get(table);
+    if (tableLayout == null) {
+      return new HashMap<String, String>();
+    }
+    return tableLayout;
+  }
+
+  @Override
+  public Map<String, Map<String, ShardState>> shardServerLayoutState(final String table) throws BlurException,
+      TException {
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<Map<String, Map<String, ShardState>>>() {
+        @Override
+        public Map<String, Map<String, ShardState>> call(Client client) throws BlurException, TException {
+          try {
+            return client.shardServerLayoutState(table);
+          } catch (BlurException e) {
+            LOG.error("UNKOWN error from shard server", e);
+            throw e;
+          }
+        }
+      }, new Merger<Map<String, Map<String, ShardState>>>() {
+        @Override
+        public Map<String, Map<String, ShardState>> merge(
+            BlurExecutorCompletionService<Map<String, Map<String, ShardState>>> service) throws BlurException {
+          Map<String, Map<String, ShardState>> result = new HashMap<String, Map<String, ShardState>>();
+          while (service.getRemainingCount() > 0) {
+            Future<Map<String, Map<String, ShardState>>> future = service.poll(_defaultParallelCallTimeout,
+                TimeUnit.MILLISECONDS, true, table);
+            Map<String, Map<String, ShardState>> shardResult = service.getResultThrowException(future, table);
+            for (Entry<String, Map<String, ShardState>> entry : shardResult.entrySet()) {
+              Map<String, ShardState> map = result.get(entry.getKey());
+              if (map == null) {
+                map = new HashMap<String, ShardState>();
+                result.put(entry.getKey(), map);
+              }
+              map.putAll(entry.getValue());
+            }
+          }
+          return result;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get shard server layout [{0}]", e, table);
+      throw new BException("Unknown error while trying to get shard server layout [{0}]", e, table);
+    }
+  }
+
+  @Override
+  public long recordFrequency(final String table, final String columnFamily, final String columnName, final String value)
+      throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<Long>() {
+        @Override
+        public Long call(Client client) throws BlurException, TException {
+          return client.recordFrequency(table, columnFamily, columnName, value);
+        }
+      }, new Merger<Long>() {
+
+        @Override
+        public Long merge(BlurExecutorCompletionService<Long> service) throws BlurException {
+          Long total = 0L;
+          while (service.getRemainingCount() > 0) {
+            Future<Long> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table,
+                columnFamily, columnName, value);
+            total += service.getResultThrowException(future, table, columnFamily, columnName, value);
+          }
+          return total;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get record frequency [{0}/{1}/{2}/{3}]", e, table, columnFamily,
+          columnName, value);
+      throw new BException("Unknown error while trying to get record frequency [{0}/{1}/{2}/{3}]", e, table,
+          columnFamily, columnName, value);
+    }
+  }
+
+  @Override
+  public Schema schema(final String table) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<Schema>() {
+        @Override
+        public Schema call(Client client) throws BlurException, TException {
+          return client.schema(table);
+        }
+      }, new Merger<Schema>() {
+        @Override
+        public Schema merge(BlurExecutorCompletionService<Schema> service) throws BlurException {
+          Schema result = null;
+          while (service.getRemainingCount() > 0) {
+            Future<Schema> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table);
+            Schema schema = service.getResultThrowException(future, table);
+            if (result == null) {
+              result = schema;
+            } else {
+              result = BlurControllerServer.merge(result, schema);
+            }
+          }
+          return result;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to schema table [{0}]", e, table);
+      throw new BException("Unknown error while trying to schema table [{0}]", e, table);
+    }
+  }
+
+  @Override
+  public List<String> terms(final String table, final String columnFamily, final String columnName,
+      final String startWith, final short size) throws BlurException, TException {
+    checkTable(table);
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<List<String>>() {
+        @Override
+        public List<String> call(Client client) throws BlurException, TException {
+          return client.terms(table, columnFamily, columnName, startWith, size);
+        }
+      }, new Merger<List<String>>() {
+        @Override
+        public List<String> merge(BlurExecutorCompletionService<List<String>> service) throws BlurException {
+          TreeSet<String> terms = new TreeSet<String>();
+          while (service.getRemainingCount() > 0) {
+            Future<List<String>> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table,
+                columnFamily, columnName, startWith, size);
+            terms.addAll(service.getResultThrowException(future, table, columnFamily, columnName, startWith, size));
+          }
+          return new ArrayList<String>(terms).subList(0, Math.min(terms.size(), size));
+        }
+      });
+    } catch (Exception e) {
+      LOG.error(
+          "Unknown error while trying to terms table [{0}] columnFamily [{1}] columnName [{2}] startWith [{3}] size [{4}]",
+          e, table, columnFamily, columnName, startWith, size);
+      throw new BException(
+          "Unknown error while trying to terms table [{0}] columnFamily [{1}] columnName [{2}] startWith [{3}] size [{4}]",
+          e, table, columnFamily, columnName, startWith, size);
+    }
+  }
+
+  private String getNode(String table, Selector selector) throws BlurException, TException {
+    Map<String, String> layout = shardServerLayout(table);
+    String locationId = selector.locationId;
+    if (locationId != null) {
+      String shard = locationId.substring(0, locationId.indexOf('/'));
+      return layout.get(shard);
+    }
+    int numberOfShards = getShardCount(table);
+    if (selector.rowId != null) {
+      String shardName = MutationHelper.getShardName(table, selector.rowId, numberOfShards, _blurPartitioner);
+      return layout.get(shardName);
+    }
+    throw new BlurException("Selector is missing both a locationid and a rowid, one is needed.", null);
+  }
+
+  private <R> R scatterGather(String cluster, final BlurCommand<R> command, Merger<R> merger) throws Exception {
+    return ForkJoin.execute(_executor, _clusterStatus.getOnlineShardServers(true, cluster),
+        new ParallelCall<String, R>() {
+          @Override
+          public R call(String hostnamePort) throws BlurException, TException, IOException {
+            return _client.execute(hostnamePort, command.clone(), _maxDefaultRetries, _defaultDelay, _maxDefaultDelay);
+          }
+        }).merge(merger);
+  }
+
+  private <R> void scatter(String cluster, BlurCommand<R> command) throws Exception {
+    scatterGather(cluster, command, new Merger<R>() {
+      @Override
+      public R merge(BlurExecutorCompletionService<R> service) throws BlurException {
+        while (service.getRemainingCount() > 0) {
+          Future<R> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true);
+          service.getResultThrowException(future);
+        }
+        return null;
+      }
+    });
+  }
+
+  private String getCluster(String table) throws BlurException, TException {
+    TableDescriptor describe = describe(table);
+    if (describe == null) {
+      throw new BlurException("Table [" + table + "] not found.", null);
+    }
+    return describe.cluster;
+  }
+
+  public static Schema merge(Schema result, Schema schema) {
+    Map<String, Set<String>> destColumnFamilies = result.columnFamilies;
+    Map<String, Set<String>> srcColumnFamilies = schema.columnFamilies;
+    for (String srcColumnFamily : srcColumnFamilies.keySet()) {
+      Set<String> destColumnNames = destColumnFamilies.get(srcColumnFamily);
+      Set<String> srcColumnNames = srcColumnFamilies.get(srcColumnFamily);
+      if (destColumnNames == null) {
+        destColumnFamilies.put(srcColumnFamily, srcColumnNames);
+      } else {
+        destColumnNames.addAll(srcColumnNames);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void mutate(final RowMutation mutation) throws BlurException, TException {
+    checkTable(mutation.table);
+    checkForUpdates(mutation.table);
+    try {
+      MutationHelper.validateMutation(mutation);
+      String table = mutation.getTable();
+
+      int numberOfShards = getShardCount(table);
+      Map<String, String> tableLayout = _shardServerLayout.get().get(table);
+      if (tableLayout.size() != numberOfShards) {
+        throw new BlurException("Cannot update data while shard is missing", null);
+      }
+
+      String shardName = MutationHelper.getShardName(table, mutation.rowId, numberOfShards, _blurPartitioner);
+      String node = tableLayout.get(shardName);
+      _client.execute(node, new BlurCommand<Void>() {
+        @Override
+        public Void call(Client client) throws BlurException, TException {
+          client.mutate(mutation);
+          return null;
+        }
+      }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
+    } catch (Exception e) {
+      LOG.error("Unknown error during mutation of [{0}]", e, mutation);
+      throw new BException("Unknown error during mutation of [{0}]", e, mutation);
+    }
+  }
+
+  private int getShardCount(String table) throws BlurException, TException {
+    Integer numberOfShards = _tableShardCountMap.get(table);
+    if (numberOfShards == null) {
+      TableDescriptor descriptor = describe(table);
+      numberOfShards = descriptor.shardCount;
+      _tableShardCountMap.put(table, numberOfShards);
+    }
+    return numberOfShards;
+  }
+
+  @Override
+  public void mutateBatch(List<RowMutation> mutations) throws BlurException, TException {
+    for (RowMutation mutation : mutations) {
+      MutationHelper.validateMutation(mutation);
+    }
+    Map<String, List<RowMutation>> batches = new HashMap<String, List<RowMutation>>();
+    for (RowMutation mutation : mutations) {
+      checkTable(mutation.table);
+      checkForUpdates(mutation.table);
+
+      MutationHelper.validateMutation(mutation);
+      String table = mutation.getTable();
+
+      int numberOfShards = getShardCount(table);
+      Map<String, String> tableLayout = _shardServerLayout.get().get(table);
+      if (tableLayout.size() != numberOfShards) {
+        throw new BlurException("Cannot update data while shard is missing", null);
+      }
+
+      String shardName = MutationHelper.getShardName(table, mutation.rowId, numberOfShards, _blurPartitioner);
+      String node = tableLayout.get(shardName);
+      List<RowMutation> list = batches.get(node);
+      if (list == null) {
+        list = new ArrayList<RowMutation>();
+        batches.put(node, list);
+      }
+      list.add(mutation);
+
+    }
+
+    for (Entry<String, List<RowMutation>> entry : batches.entrySet()) {
+      String node = entry.getKey();
+      final List<RowMutation> mutationsLst = entry.getValue();
+      try {
+        _client.execute(node, new BlurCommand<Void>() {
+          @Override
+          public Void call(Client client) throws BlurException, TException {
+            client.mutateBatch(mutationsLst);
+            return null;
+          }
+        }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
+      } catch (Exception e) {
+        LOG.error("Unknown error during mutations of [{0}]", e, mutationsLst);
+        throw new BException("Unknown error during mutations of [{0}]", e, mutationsLst);
+      }
+    }
+  }
+
+  public void setNodeName(String nodeName) {
+    _nodeName = nodeName;
+  }
+
+  public int getRemoteFetchCount() {
+    return _remoteFetchCount;
+  }
+
+  public void setRemoteFetchCount(int remoteFetchCount) {
+    _remoteFetchCount = remoteFetchCount;
+  }
+
+  public long getMaxTimeToLive() {
+    return _maxTimeToLive;
+  }
+
+  public void setMaxTimeToLive(long maxTimeToLive) {
+    _maxTimeToLive = maxTimeToLive;
+  }
+
+  public int getMaxQueryCacheElements() {
+    return _maxQueryCacheElements;
+  }
+
+  public void setMaxQueryCacheElements(int maxQueryCacheElements) {
+    _maxQueryCacheElements = maxQueryCacheElements;
+  }
+
+  public void setQueryChecker(BlurQueryChecker queryChecker) {
+    _queryChecker = queryChecker;
+  }
+
+  public void setThreadCount(int threadCount) {
+    _threadCount = threadCount;
+  }
+
+  public void setMaxFetchRetries(int maxFetchRetries) {
+    _maxFetchRetries = maxFetchRetries;
+  }
+
+  public void setMaxMutateRetries(int maxMutateRetries) {
+    _maxMutateRetries = maxMutateRetries;
+  }
+
+  public void setMaxDefaultRetries(int maxDefaultRetries) {
+    _maxDefaultRetries = maxDefaultRetries;
+  }
+
+  public void setFetchDelay(long fetchDelay) {
+    _fetchDelay = fetchDelay;
+  }
+
+  public void setMutateDelay(long mutateDelay) {
+    _mutateDelay = mutateDelay;
+  }
+
+  public void setDefaultDelay(long defaultDelay) {
+    _defaultDelay = defaultDelay;
+  }
+
+  public void setMaxFetchDelay(long maxFetchDelay) {
+    _maxFetchDelay = maxFetchDelay;
+  }
+
+  public void setMaxMutateDelay(long maxMutateDelay) {
+    _maxMutateDelay = maxMutateDelay;
+  }
+
+  public void setMaxDefaultDelay(long maxDefaultDelay) {
+    _maxDefaultDelay = maxDefaultDelay;
+  }
+
+  public BlurClient getClient() {
+    return _client;
+  }
+
+  public void setClient(BlurClient client) {
+    _client = client;
+  }
+
+  @Override
+  public void optimize(final String table, final int numberOfSegmentsPerShard) throws BlurException, TException {
+    checkTable(table);
+    try {
+      scatter(getCluster(table), new BlurCommand<Void>() {
+        @Override
+        public Void call(Client client) throws BlurException, TException {
+          client.optimize(table, numberOfSegmentsPerShard);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to optimize [table={0},numberOfSegmentsPerShard={1}]", e, table,
+          numberOfSegmentsPerShard);
+      throw new BException("Unknown error while trying to optimize [table={0},numberOfSegmentsPerShard={1}]", e, table,
+          numberOfSegmentsPerShard);
+    }
+  }
+
+}
\ No newline at end of file


Mime
View raw message