incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/4] git commit: Reworking scripts to remove the controller scripts and shard server terminology.
Date Sun, 02 Dec 2012 04:23:16 GMT
Reworking scripts to remove the controller scripts and shard server terminology.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/ed671149
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/ed671149
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/ed671149

Branch: refs/heads/0.2-dev-removing-old-thrift
Commit: ed671149bd1fdfecdf8fa0e96f451cce3aae115a
Parents: fab00ff
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Dec 1 21:08:40 2012 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Dec 1 21:08:40 2012 -0500

----------------------------------------------------------------------
 bin/controllers.sh                                 |   34 -
 bin/shards.sh                                      |   34 -
 bin/start-all.sh                                   |    5 +-
 bin/start-controller-server.sh                     |   41 -
 bin/start-controllers.sh                           |   24 -
 bin/start-shard-server.sh                          |   43 -
 bin/start-shards.sh                                |   25 -
 bin/stop-all.sh                                    |    4 +-
 bin/stop-controller-server.sh                      |   38 -
 bin/stop-controllers.sh                            |   24 -
 bin/stop-shard-server.sh                           |   38 -
 bin/stop-shards.sh                                 |   24 -
 .../java/org/apache/blur/thrift/BlurServer.java    |  634 +++++++++++++++
 .../org/apache/blur/thrift/BlurShardServer.java    |  634 ---------------
 .../org/apache/blur/thrift/ThriftBlurServer.java   |  292 +++++++
 .../apache/blur/thrift/ThriftBlurShardServer.java  |  292 -------
 .../src/test/java/org/apache/blur/MiniCluster.java |    4 +-
 .../apache/blur/search/RandomSuperQueryTest.java   |   99 ++-
 .../java/org/apache/blur/gui/HttpJettyServer.java  |   13 +-
 19 files changed, 998 insertions(+), 1304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/bin/controllers.sh
----------------------------------------------------------------------
diff --git a/bin/controllers.sh b/bin/controllers.sh
deleted file mode 100755
index 37c7108..0000000
--- a/bin/controllers.sh
+++ /dev/null
@@ -1,34 +0,0 @@
-#!/usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/blur-config.sh
-
-export HOSTLIST="${BLUR_HOME_CONF}/controllers"
-
-for controller in `cat "$HOSTLIST"|sed  "s/#.*$//;/^$/d"`; do
- ssh $BLUR_SSH_OPTS $controller $"${@// /\\ }" \
-   2>&1 | sed "s/^/$controller: /" &
- if [ "$BLUR_CONTROLLER_SLEEP" != "" ]; then
-   sleep $BLUR_CONTROLLER_SLEEP
- fi
-done
-
-wait
-

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/bin/shards.sh
----------------------------------------------------------------------
diff --git a/bin/shards.sh b/bin/shards.sh
deleted file mode 100755
index 6b22678..0000000
--- a/bin/shards.sh
+++ /dev/null
@@ -1,34 +0,0 @@
-#!/usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/blur-config.sh
-
-export HOSTLIST="${BLUR_HOME_CONF}/shards"
-
-for shard in `cat "$HOSTLIST"|sed  "s/#.*$//;/^$/d"`; do
- ssh $BLUR_SSH_OPTS $shard $"${@// /\\ }" \
-   2>&1 | sed "s/^/$shard: /" &
- if [ "$BLUR_SHARD_SLEEP" != "" ]; then
-   sleep $BLUR_SHARD_SLEEP
- fi
-done
-
-wait
-

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/bin/start-all.sh
----------------------------------------------------------------------
diff --git a/bin/start-all.sh b/bin/start-all.sh
index 7da121a..639ab98 100755
--- a/bin/start-all.sh
+++ b/bin/start-all.sh
@@ -20,5 +20,6 @@ bin=`cd "$bin"; pwd`
 
 . "$bin"/blur-config.sh
 
-$BLUR_HOME/bin/start-shards.sh
-$BLUR_HOME/bin/start-controllers.sh
\ No newline at end of file
+$BLUR_HOME/bin/shards.sh $BLUR_HOME/bin/start-shard-server.sh
+
+

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/bin/start-controller-server.sh
----------------------------------------------------------------------
diff --git a/bin/start-controller-server.sh b/bin/start-controller-server.sh
deleted file mode 100755
index 750152b..0000000
--- a/bin/start-controller-server.sh
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/blur-config.sh
-
-INSTANCE=0
-while [  $INSTANCE -lt $BLUR_NUMBER_OF_CONTROLLER_SERVER_INSTANCES_PER_MACHINE ]; do
-  PID_FILE=$BLUR_HOME/pids/controller-$INSTANCE.pid
-
-  if [ -f $PID_FILE ]; then
-    if kill -0 `cat $PID_FILE` > /dev/null 2>&1; then
-      echo Controller server already running as process `cat $PID_FILE`.  Stop it first.
-      let INSTANCE=INSTANCE+1
-      continue
-    fi
-  fi
-
-  PROC_NAME=blur-controller-server-$HOSTNAME-$INSTANCE
-  nohup "$JAVA_HOME"/bin/java -Dblur.name=$PROC_NAME -Djava.library.path=$JAVA_LIBRARY_PATH -Dblur-controller-$INSTANCE $BLUR_CONTROLLER_JVM_OPTIONS -Dblur.logs.dir=$BLUR_LOGS -Dblur.log.file=$PROC_NAME.log -cp $BLUR_CLASSPATH org.apache.blur.thrift.ThriftBlurControllerServer -s $INSTANCE > "$BLUR_LOGS/$PROC_NAME.out" 2>&1 < /dev/null &
-  echo $! > $PID_FILE
-  echo Controller [$INSTANCE] starting as process `cat $PID_FILE`.
-
-  let INSTANCE=INSTANCE+1 
-done
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/bin/start-controllers.sh
----------------------------------------------------------------------
diff --git a/bin/start-controllers.sh b/bin/start-controllers.sh
deleted file mode 100755
index c9b77f1..0000000
--- a/bin/start-controllers.sh
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/blur-config.sh
-
-$BLUR_HOME/bin/controllers.sh $BLUR_HOME/bin/start-controller-server.sh
-

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/bin/start-shard-server.sh
----------------------------------------------------------------------
diff --git a/bin/start-shard-server.sh b/bin/start-shard-server.sh
deleted file mode 100755
index 99a4611..0000000
--- a/bin/start-shard-server.sh
+++ /dev/null
@@ -1,43 +0,0 @@
-#!/usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/blur-config.sh
-
-INSTANCE=0
-while [  $INSTANCE -lt $BLUR_NUMBER_OF_SHARD_SERVER_INSTANCES_PER_MACHINE ]; do
-  PID_FILE=$BLUR_HOME/pids/shard-$INSTANCE.pid
-
-  if [ -f $PID_FILE ]; then
-    if kill -0 `cat $PID_FILE` > /dev/null 2>&1; then
-      echo Shard server already running as process `cat $PID_FILE`.  Stop it first.
-      let INSTANCE=INSTANCE+1
-      continue
-    fi
-  fi
-
-  PROC_NAME=blur-shard-server-$HOSTNAME-$INSTANCE
-  nohup "$JAVA_HOME"/bin/java -Dblur.name=$PROC_NAME -Djava.library.path=$JAVA_LIBRARY_PATH -Dblur-shard-$INSTANCE $BLUR_SHARD_JVM_OPTIONS -Dblur.logs.dir=$BLUR_LOGS -Dblur.log.file=$PROC_NAME.log -cp $BLUR_CLASSPATH org.apache.blur.thrift.ThriftBlurShardServer -s $INSTANCE > "$BLUR_LOGS/$PROC_NAME.out" 2>&1 < /dev/null &
-  echo $! > $PID_FILE
-  echo Shard [$INSTANCE] starting as process `cat $PID_FILE`.
-
-  let INSTANCE=INSTANCE+1 
-done
-
-

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/bin/start-shards.sh
----------------------------------------------------------------------
diff --git a/bin/start-shards.sh b/bin/start-shards.sh
deleted file mode 100755
index 639ab98..0000000
--- a/bin/start-shards.sh
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/blur-config.sh
-
-$BLUR_HOME/bin/shards.sh $BLUR_HOME/bin/start-shard-server.sh
-
-

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/bin/stop-all.sh
----------------------------------------------------------------------
diff --git a/bin/stop-all.sh b/bin/stop-all.sh
index 8c758fb..5e1342d 100755
--- a/bin/stop-all.sh
+++ b/bin/stop-all.sh
@@ -20,5 +20,5 @@ bin=`cd "$bin"; pwd`
 
 . "$bin"/blur-config.sh
 
-$BLUR_HOME/bin/stop-controllers.sh
-$BLUR_HOME/bin/stop-shards.sh
+$BLUR_HOME/bin/shards.sh $BLUR_HOME/bin/stop-shard-server.sh
+

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/bin/stop-controller-server.sh
----------------------------------------------------------------------
diff --git a/bin/stop-controller-server.sh b/bin/stop-controller-server.sh
deleted file mode 100755
index 2595e19..0000000
--- a/bin/stop-controller-server.sh
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/blur-config.sh
-
-INSTANCE=0
-while [  $INSTANCE -lt $BLUR_NUMBER_OF_CONTROLLER_SERVER_INSTANCES_PER_MACHINE ]; do
-  PID_FILE=$BLUR_HOME/pids/controller-$INSTANCE.pid
-
-  if [ -f $PID_FILE ]; then
-    if kill -0 `cat $PID_FILE` > /dev/null 2>&1; then
-      echo Stopping Controller [$INSTANCE] server with pid [`cat $PID_FILE`].
-      kill `cat $PID_FILE`
-    else
-      echo No Controller [$INSTANCE] server to stop
-    fi
-  else
-    echo No Controller [$INSTANCE] server to stop
-  fi
-  let INSTANCE=INSTANCE+1 
-done
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/bin/stop-controllers.sh
----------------------------------------------------------------------
diff --git a/bin/stop-controllers.sh b/bin/stop-controllers.sh
deleted file mode 100755
index 5a44096..0000000
--- a/bin/stop-controllers.sh
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/blur-config.sh
-
-$BLUR_HOME/bin/controllers.sh $BLUR_HOME/bin/stop-controller-server.sh
-

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/bin/stop-shard-server.sh
----------------------------------------------------------------------
diff --git a/bin/stop-shard-server.sh b/bin/stop-shard-server.sh
deleted file mode 100755
index d850ffb..0000000
--- a/bin/stop-shard-server.sh
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/blur-config.sh
-
-INSTANCE=0
-while [  $INSTANCE -lt $BLUR_NUMBER_OF_SHARD_SERVER_INSTANCES_PER_MACHINE ]; do
-  PID_FILE=$BLUR_HOME/pids/shard-$INSTANCE.pid
-
-  if [ -f $PID_FILE ]; then
-    if kill -0 `cat $PID_FILE` > /dev/null 2>&1; then
-      echo Stopping Shard [$INSTANCE] server with pid [`cat $PID_FILE`].
-      kill `cat $PID_FILE`
-    else
-      echo No Shard [$INSTANCE] server to stop
-    fi
-  else
-    echo No Shard [$INSTANCE] server to stop
-  fi
-  let INSTANCE=INSTANCE+1 
-done
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/bin/stop-shards.sh
----------------------------------------------------------------------
diff --git a/bin/stop-shards.sh b/bin/stop-shards.sh
deleted file mode 100755
index 5e1342d..0000000
--- a/bin/stop-shards.sh
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/blur-config.sh
-
-$BLUR_HOME/bin/shards.sh $BLUR_HOME/bin/stop-shard-server.sh
-

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
new file mode 100644
index 0000000..3bf8d2a
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
@@ -0,0 +1,634 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_TIMETOLIVE;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DATA_FETCH_THREAD_COUNT;
+import static org.apache.blur.utils.ThriftLuceneConversion.setShardIndexTopDocs;
+import static org.apache.blur.utils.ThriftLuceneConversion.setShardIndexTopFieldDocs;
+import static org.apache.blur.utils.ThriftLuceneConversion.toThrift;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.manager.IndexServer;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.thrift.BlurServer.SearchAction.ACTION;
+import org.apache.blur.thrift.TableLayout.TYPE;
+import org.apache.blur.thrift.commands.BlurCommand;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Document;
+import org.apache.blur.thrift.generated.Generation;
+import org.apache.blur.thrift.generated.LiveSchema;
+import org.apache.blur.thrift.generated.MutateOptions;
+import org.apache.blur.thrift.generated.QueryArgs;
+import org.apache.blur.thrift.generated.QueryStatus;
+import org.apache.blur.thrift.generated.Session;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.thrift.generated.Term;
+import org.apache.blur.thrift.generated.TopFieldDocs;
+import org.apache.blur.thrift.generated.UpdatePackage;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.ThriftLuceneConversion;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.Filter;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Sort;
+import org.apache.thrift.TException;
+
+public class BlurServer extends TableAdmin implements Iface {
+
+  private static final Log LOG = LogFactory.getLog(BlurServer.class);
+  private IndexManager _indexManager;
+  private IndexServer _indexServer;
+  private boolean _closed;
+  private long _maxTimeToLive = TimeUnit.MINUTES.toMillis(1);
+  private int _maxQueryCacheElements = 128;
+  private ExecutorService _dataFetch;
+  private ExecutorService _indexSearcherExecutor;
+  private ExecutorService _searchExecutor;
+  private int _dataFetchThreadCount = 32;
+  private TableLayout _layout;
+
+  public void init() throws BlurException {
+    _dataFetch = Executors.newThreadPool("data-fetch-", _dataFetchThreadCount);
+    _indexSearcherExecutor = Executors.newThreadPool("index-searcher-", 16);
+    _searchExecutor = Executors.newThreadPool("search-", 16);
+
+    if (_configuration == null) {
+      throw new BException("Configuration must be set before initialization.");
+    }
+    _dataFetchThreadCount = _configuration.getInt(BLUR_SHARD_DATA_FETCH_THREAD_COUNT, 8);
+    _maxQueryCacheElements = _configuration.getInt(BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS, 128);
+    _maxTimeToLive = _configuration.getLong(BLUR_SHARD_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1));
+  }
+
+  @Override
+  public TableStats tableStats(String table) throws BlurException, TException {
+    checkTable(table);
+    try {
+      TableStats tableStats = new TableStats();
+      // tableStats.tableName = table;
+      // tableStats.recordCount = _indexServer.getRecordCount(table);
+      // tableStats.rowCount = _indexServer.getRowCount(table);
+      tableStats.bytes = _indexServer.getTableSize(table);
+      tableStats.queries = 0;
+      return tableStats;
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get table stats [table={0}]", e, table);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  public synchronized void close() {
+    if (!_closed) {
+      _closed = true;
+      _indexManager.close();
+      _dataFetch.shutdownNow();
+    }
+  }
+
+  public long getMaxTimeToLive() {
+    return _maxTimeToLive;
+  }
+
+  public void setMaxTimeToLive(long maxTimeToLive) {
+    _maxTimeToLive = maxTimeToLive;
+  }
+
+  public int getMaxQueryCacheElements() {
+    return _maxQueryCacheElements;
+  }
+
+  public void setMaxQueryCacheElements(int maxQueryCacheElements) {
+    _maxQueryCacheElements = maxQueryCacheElements;
+  }
+
+  public void setIndexManager(IndexManager indexManager) {
+    _indexManager = indexManager;
+  }
+
+  public void setIndexServer(IndexServer indexServer) {
+    _indexServer = indexServer;
+  }
+
+  public int getDataFetchThreadCount() {
+    return _dataFetchThreadCount;
+  }
+
+  public void setDataFetchThreadCount(int dataFetchThreadCount) {
+    _dataFetchThreadCount = dataFetchThreadCount;
+  }
+
+  public void setConfiguration(BlurConfiguration conf) {
+    _configuration = conf;
+  }
+
+  // New interface from this point
+
+  private Map<String, SessionInfo> sessions = new ConcurrentHashMap<String, SessionInfo>();
+
+  @Override
+  public Session openReadSession(String table) throws BlurException, TException {
+    String uuid = UUID.randomUUID().toString();
+    return newSession(table, uuid);
+  }
+
+  private Session newSession(String table, String uuid) throws BlurException {
+    checkTable(table);
+    BlurAnalyzer analyzer = _indexServer.getAnalyzer(table);
+    Map<String, BlurIndex> blurIndexes;
+    try {
+      blurIndexes = _indexServer.getIndexes(table);
+    } catch (IOException e) {
+      LOG.error("Unknown error while trying to fetch index readers.", e);
+      throw new BException(e.getMessage(), e);
+    }
+    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, table);
+    SessionInfo sessionInfo = new SessionInfo();
+    sessionInfo.setUuid(uuid);
+    sessionInfo.setAnalyzer(analyzer);
+    sessionInfo.setTableDescriptor(tableDescriptor);
+
+    for (Entry<String, BlurIndex> entry : blurIndexes.entrySet()) {
+      int index = BlurUtil.getShardIndex(entry.getKey());
+      try {
+        IndexReader indexReader = entry.getValue().getIndexReader();
+        // @TODO use new thread pool here
+        IndexSearcher indexSearcher = new IndexSearcher(indexReader, _searchExecutor);
+        sessionInfo.add(index, indexReader);
+        sessionInfo.add(index, indexSearcher);
+      } catch (IOException e) {
+        LOG.error("Unknown error while trying to fetch index readers.", e);
+      }
+    }
+    sessions.put(uuid, sessionInfo);
+    return new Session(uuid, table);
+  }
+
+  @Override
+  public List<TopFieldDocs> search(final Session session, final QueryArgs queryArgs) throws BlurException, TException {
+    SessionInfo info = getSessionInfo(session);
+    if (info == null) {
+      newSession(session.getTableName(), session.getSessionId());
+      info = getSessionInfo(session);
+    }
+    try {
+      Map<Integer, IndexSearcher> searchers = info.getSearchers();
+      List<Integer> shardIndexes = queryArgs.getShardIndexes();
+      TableDescriptor tableDescriptor = info.getTableDescriptor();
+      Collection<SearchAction> searchersToSearch = getSearchActions(tableDescriptor, shardIndexes, searchers);
+
+      List<Future<TopFieldDocs>> futures = new ArrayList<Future<TopFieldDocs>>(searchersToSearch.size());
+      Query query = ThriftLuceneConversion.toLuceneQuery(queryArgs.query);
+      Filter filter = ThriftLuceneConversion.toLuceneFilter(queryArgs);
+      Sort sort = ThriftLuceneConversion.toLuceneSort(queryArgs);
+      ScoreDoc after = ThriftLuceneConversion.toLucene(queryArgs.getAfter());
+      boolean doDocScores = queryArgs.isDoDocScores();
+      boolean doMaxScore = queryArgs.isDoMaxScore();
+      int numberToFetch = queryArgs.getNumberToFetch();
+      for (SearchAction action : searchersToSearch) {
+        final int shardIndex = action.shardIndex;
+        if (action.type == ACTION.LOCAL) {
+          SearchCallable searchCallable = new SearchCallable(shardIndex, action.indexSearcher, after, query, filter, sort, numberToFetch, doDocScores, doMaxScore);
+          Future<TopFieldDocs> future = _indexSearcherExecutor.submit(searchCallable);
+          futures.add(future);
+        } else if (action.type == ACTION.REMOTE) {
+          // @TODO need to send only one call per server, instead of one for
+          // each shard server
+          final Connection connection = action.remoteServer;
+          Future<TopFieldDocs> future = _indexSearcherExecutor.submit(new Callable<TopFieldDocs>() {
+            @Override
+            public TopFieldDocs call() throws Exception {
+              List<TopFieldDocs> list = BlurClientManager.execute(connection, new BlurCommand<List<TopFieldDocs>>() {
+                @Override
+                public List<TopFieldDocs> call(Client client) throws BlurException, TException {
+                  QueryArgs remoteQueryArgs = new QueryArgs(queryArgs);
+                  remoteQueryArgs.addToShardIndexes(shardIndex);
+                  return client.search(session, remoteQueryArgs);
+                }
+              });
+              return list.iterator().next();
+            }
+          });
+          futures.add(future);
+        }
+      }
+
+      List<TopFieldDocs> result = new ArrayList<TopFieldDocs>(futures.size());
+      for (Future<TopFieldDocs> future : futures) {
+        result.add(future.get());
+      }
+      return result;
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  static class SearchCallable implements Callable<TopFieldDocs> {
+    private final ScoreDoc after;
+    private final Sort sort;
+    private final Filter filter;
+    private final Query query;
+    private final IndexSearcher searcher;
+    private final int count;
+    private final boolean doDocScores;
+    private final boolean doMaxScore;
+    private final int shardIndex;
+
+    SearchCallable(int shardIndex, IndexSearcher searcher, ScoreDoc after, Query query, Filter filter, Sort sort, int count, boolean doDocScores, boolean doMaxScore) {
+      this.after = after;
+      this.searcher = searcher;
+      this.query = query;
+      this.filter = filter;
+      this.sort = sort;
+      this.count = count;
+      this.doDocScores = doDocScores;
+      this.doMaxScore = doMaxScore;
+      this.shardIndex = shardIndex;
+    }
+
+    @Override
+    public TopFieldDocs call() throws Exception {
+      return addShardIndex(doSearch());
+    }
+
+    private TopFieldDocs addShardIndex(TopFieldDocs topFieldDocs) {
+      topFieldDocs.setShardIndex(shardIndex);
+      return topFieldDocs;
+    }
+
+    private TopFieldDocs doSearch() throws IOException {
+      if (after == null) {
+        if (sort == null) {
+          return toThrift(setShardIndexTopDocs(shardIndex, searcher.search(query, filter, count)));
+        } else {
+          return toThrift(setShardIndexTopFieldDocs(shardIndex, searcher.search(query, filter, count, sort, doDocScores, doMaxScore)));
+        }
+      } else {
+        if (sort == null) {
+          return toThrift(setShardIndexTopDocs(shardIndex, searcher.searchAfter(after, query, filter, count)));
+        } else {
+          return toThrift(setShardIndexTopFieldDocs(shardIndex,
+              (org.apache.lucene.search.TopFieldDocs) searcher.searchAfter(after, query, filter, count, sort, doDocScores, doMaxScore)));
+        }
+      }
+    }
+  }
+
+  static class SearchAction {
+    enum ACTION {
+      LOCAL, REMOTE
+    }
+
+    ACTION type;
+
+    SearchAction(int shardIndex, IndexSearcher indexSearcher) {
+      this.type = ACTION.LOCAL;
+      this.shardIndex = shardIndex;
+      this.indexSearcher = indexSearcher;
+    }
+
+    SearchAction(int shardIndex, Connection remoteServer) {
+      this.type = ACTION.REMOTE;
+      this.shardIndex = shardIndex;
+      this.remoteServer = remoteServer;
+    }
+
+    int shardIndex;
+    IndexSearcher indexSearcher;
+    Connection remoteServer;
+  }
+
+  private Collection<SearchAction> getSearchActions(TableDescriptor tableDescriptor, List<Integer> shardIndexes, Map<Integer, IndexSearcher> searchers) throws BlurException {
+    String name = tableDescriptor.getName();
+    int shardCount = tableDescriptor.getShardCount();
+    Collection<SearchAction> searchersToSearch = new ArrayList<SearchAction>();
+    if (shardIndexes == null) {
+      shardIndexes = new ArrayList<Integer>(shardCount);
+      // all indexes
+      for (int i = 0; i < shardCount; i++) {
+        shardIndexes.add(i);
+      }
+    }
+    for (Integer index : shardIndexes) {
+      IndexSearcher searcher = searchers.get(index);
+      if (searcher != null) {
+        searchersToSearch.add(new SearchAction(index, searcher));
+      } else {
+        searchersToSearch.add(new SearchAction(index, getConnection(name, index)));
+      }
+    }
+    return searchersToSearch;
+  }
+
+  private SessionInfo getSessionInfo(Session session) {
+    return sessions.get(session.getSessionId());
+  }
+
+  @Override
+  public List<Document> doc(Session session, List<Long> docLocations, Set<String> fieldsToLoad) throws BlurException, TException {
+    try {
+      SessionInfo sessionInfo = getSessionInfo(session);
+      Map<Integer, IndexSearcher> searchers = sessionInfo.getSearchers();
+      List<Document> result = new ArrayList<Document>();
+      for (Long docLocation : docLocations) {
+        if (docLocation == null) {
+          throw new BlurException("Null docLocation is not allowed.", null);
+        }
+        int shardIndex = BlurUtil.getShardIndex(docLocation);
+        int docId = BlurUtil.getDocumentId(docLocation);
+        IndexSearcher searcher = searchers.get(shardIndex);
+        if (searcher == null) {
+          result.addAll(forwardDoc(session, shardIndex, docLocation, fieldsToLoad));
+        } else {
+          result.add(toThrift(searcher.document(docId, fieldsToLoad)));
+        }
+      }
+      return result;
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  private List<Document> forwardDoc(final Session session, int shardIndex, final Long docLocation, final Set<String> fieldsToLoad) throws BlurException, TException, IOException {
+    // TODO Make more efficient by making a single call to a server for many
+    // docLocations
+    String table = session.getTableName();
+    System.out.println(table + " " + shardIndex);
+    Connection connection = getConnection(table, shardIndex);
+    return BlurClientManager.execute(connection, new BlurCommand<List<Document>>() {
+      @Override
+      public List<Document> call(Client client) throws BlurException, TException {
+        return client.doc(session, Arrays.asList(docLocation), fieldsToLoad);
+      }
+    });
+  }
+
+  @Override
+  public void closeReadSession(Session session) throws BlurException, TException {
+    SessionInfo sessionInfo = getSessionInfo(session);
+    sessionInfo.releaseReaders();
+  }
+
+  @Override
+  public List<Generation> addDocuments(MutateOptions options, List<Document> documents) throws BlurException, TException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    boolean waitToBeVisible = options.isWaitToBeVisible();
+    boolean writeAheadLog = options.isWriteAheadLog();
+    List<Generation> generations = new ArrayList<Generation>();
+    try {
+      BlurIndex index = getIndex(table, shardIndex);
+      if (index == null) {
+        generations.addAll(forwardAddDocuments(options, documents));
+      } else {
+        long generation = index.addDocuments(waitToBeVisible, writeAheadLog, documents);
+        generations.add(new Generation(table, shardIndex, generation));
+      }
+      return generations;
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  @Override
+  public void blockUntilGenerationIsVisible(List<Generation> generations, boolean forceRefresh) throws BlurException, TException {
+    try {
+      for (Generation generation : generations) {
+        String table = generation.getTable();
+        int shardIndex = generation.getShardIndex();
+        BlurIndex index = getIndex(table, shardIndex);
+        if (index == null) {
+          forwardBlockUntilGenerationIsVisible(generation, forceRefresh);
+        } else {
+          index.blockUntilGenerationIsVisible(generation.getGeneration(), forceRefresh);
+        }
+      }
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  private void forwardBlockUntilGenerationIsVisible(final Generation generation, final boolean forceRefresh) throws BlurException, TException, IOException {
+    String table = generation.getTable();
+    int shardIndex = generation.getShardIndex();
+    Connection connection = getConnection(table, shardIndex);
+    BlurClientManager.execute(connection, new BlurCommand<Void>() {
+      @Override
+      public Void call(Client client) throws BlurException, TException {
+        client.blockUntilGenerationIsVisible(Arrays.asList(generation), forceRefresh);
+        return null;
+      }
+    });
+  }
+
+  @Override
+  public List<Generation> deleteDocumentsByQueries(MutateOptions options, List<ByteBuffer> queries) throws BlurException, TException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    boolean waitToBeVisible = options.isWaitToBeVisible();
+    boolean writeAheadLog = options.isWriteAheadLog();
+    List<Generation> generations = new ArrayList<Generation>();
+    try {
+      BlurIndex index = getIndex(table, shardIndex);
+      if (index == null) {
+        generations.addAll(forwardDeleteDocumentsByQueries(options, queries));
+      } else {
+        long generation = index.deleteDocuments(waitToBeVisible, writeAheadLog, queries.toArray(new ByteBuffer[queries.size()]));
+        generations.add(new Generation(table, shardIndex, generation));
+      }
+      return generations;
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  @Override
+  public List<Generation> deleteDocuments(MutateOptions options, List<Term> terms) throws BlurException, TException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    boolean waitToBeVisible = options.isWaitToBeVisible();
+    boolean writeAheadLog = options.isWriteAheadLog();
+    List<Generation> generations = new ArrayList<Generation>();
+    try {
+      BlurIndex index = getIndex(table, shardIndex);
+      if (index == null) {
+        generations.addAll(forwardDeleteDocuments(options, terms));
+      } else {
+        long generation = index.deleteDocuments(waitToBeVisible, writeAheadLog, terms.toArray(new Term[terms.size()]));
+        generations.add(new Generation(table, shardIndex, generation));
+      }
+      return generations;
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  @Override
+  public List<Generation> updateDocuments(MutateOptions options, List<UpdatePackage> updatePackages) throws BlurException, TException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    boolean waitToBeVisible = options.isWaitToBeVisible();
+    boolean writeAheadLog = options.isWriteAheadLog();
+    List<Generation> generations = new ArrayList<Generation>();
+    try {
+      BlurIndex index = getIndex(table, shardIndex);
+      if (index == null) {
+        generations.addAll(forwardUpdateDocuments(options, updatePackages));
+      } else {
+        long generation = index.updateDocuments(waitToBeVisible, writeAheadLog, updatePackages);
+        generations.add(new Generation(table, shardIndex, generation));
+      }
+      return generations;
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  private BlurIndex getIndex(String table, int shardIndex) throws BException {
+    Map<String, BlurIndex> blurIndexes;
+    try {
+      blurIndexes = _indexServer.getIndexes(table);
+    } catch (IOException e) {
+      LOG.error("Unknown error while trying to fetch index readers.", e);
+      throw new BException(e.getMessage(), e);
+    }
+    BlurIndex index = blurIndexes.get(BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardIndex));
+    return index;
+  }
+
+  @Override
+  public List<Integer> serverLayout(String table, String server) throws BlurException, TException {
+    throw new BlurException("Not implemented", null);
+  }
+
+  @Override
+  public void cancelQuery(Session session, long id) throws BlurException, TException {
+    throw new BlurException("Not implemented", null);
+  }
+
+  @Override
+  public List<Long> queryStatusIdList(Session session) throws BlurException, TException {
+    throw new BlurException("Not implemented", null);
+  }
+
+  @Override
+  public QueryStatus queryStatus(Session session, long id) throws BlurException, TException {
+    throw new BlurException("Not implemented", null);
+  }
+
+  @Override
+  public LiveSchema schema(String table) throws BlurException, TException {
+    throw new BlurException("Not implemented", null);
+  }
+
+  public TableLayout getLayout() {
+    return _layout;
+  }
+
+  public void setLayout(TableLayout layout) {
+    this._layout = layout;
+  }
+
+  private List<Generation> forwardAddDocuments(final MutateOptions options, final List<Document> documents) throws BlurException, TException, IOException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    Connection connection = getConnection(table, shardIndex);
+    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
+      @Override
+      public List<Generation> call(Client client) throws BlurException, TException {
+        return client.addDocuments(options, documents);
+      }
+    });
+  }
+
+  private List<Generation> forwardUpdateDocuments(final MutateOptions options, final List<UpdatePackage> updatePackages) throws BlurException, TException, IOException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    Connection connection = getConnection(table, shardIndex);
+    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
+      @Override
+      public List<Generation> call(Client client) throws BlurException, TException {
+        return client.updateDocuments(options, updatePackages);
+      }
+    });
+  }
+
+  private List<Generation> forwardDeleteDocumentsByQueries(final MutateOptions options, final List<ByteBuffer> queries) throws BlurException, TException, IOException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    Connection connection = getConnection(table, shardIndex);
+    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
+      @Override
+      public List<Generation> call(Client client) throws BlurException, TException {
+        return client.deleteDocumentsByQueries(options, queries);
+      }
+    });
+  }
+
+  private List<Generation> forwardDeleteDocuments(final MutateOptions options, final List<Term> terms) throws BlurException, TException, IOException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    Connection connection = getConnection(table, shardIndex);
+    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
+      @Override
+      public List<Generation> call(Client client) throws BlurException, TException {
+        return client.deleteDocuments(options, terms);
+      }
+    });
+  }
+
+  private Connection getConnection(String table, int shardIndex) {
+    String server = _layout.findServer(table, shardIndex, TYPE.WRITABLE);
+    return new Connection(server);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
deleted file mode 100644
index 2fe9aca..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ /dev/null
@@ -1,634 +0,0 @@
-package org.apache.blur.thrift;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_TIMETOLIVE;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DATA_FETCH_THREAD_COUNT;
-import static org.apache.blur.utils.ThriftLuceneConversion.setShardIndexTopDocs;
-import static org.apache.blur.utils.ThriftLuceneConversion.setShardIndexTopFieldDocs;
-import static org.apache.blur.utils.ThriftLuceneConversion.toThrift;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.analysis.BlurAnalyzer;
-import org.apache.blur.concurrent.Executors;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.manager.IndexManager;
-import org.apache.blur.manager.IndexServer;
-import org.apache.blur.manager.writer.BlurIndex;
-import org.apache.blur.thrift.BlurShardServer.SearchAction.ACTION;
-import org.apache.blur.thrift.TableLayout.TYPE;
-import org.apache.blur.thrift.commands.BlurCommand;
-import org.apache.blur.thrift.generated.Blur.Client;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.Document;
-import org.apache.blur.thrift.generated.Generation;
-import org.apache.blur.thrift.generated.LiveSchema;
-import org.apache.blur.thrift.generated.MutateOptions;
-import org.apache.blur.thrift.generated.QueryArgs;
-import org.apache.blur.thrift.generated.QueryStatus;
-import org.apache.blur.thrift.generated.Session;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.thrift.generated.Term;
-import org.apache.blur.thrift.generated.TopFieldDocs;
-import org.apache.blur.thrift.generated.UpdatePackage;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.blur.utils.ThriftLuceneConversion;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.Filter;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.Sort;
-import org.apache.thrift.TException;
-
-public class BlurShardServer extends TableAdmin implements Iface {
-
-  private static final Log LOG = LogFactory.getLog(BlurShardServer.class);
-  private IndexManager _indexManager;
-  private IndexServer _indexServer;
-  private boolean _closed;
-  private long _maxTimeToLive = TimeUnit.MINUTES.toMillis(1);
-  private int _maxQueryCacheElements = 128;
-  private ExecutorService _dataFetch;
-  private ExecutorService _indexSearcherExecutor;
-  private ExecutorService _searchExecutor;
-  private int _dataFetchThreadCount = 32;
-  private TableLayout _layout;
-
-  public void init() throws BlurException {
-    _dataFetch = Executors.newThreadPool("data-fetch-", _dataFetchThreadCount);
-    _indexSearcherExecutor = Executors.newThreadPool("index-searcher-", 16);
-    _searchExecutor = Executors.newThreadPool("search-", 16);
-
-    if (_configuration == null) {
-      throw new BException("Configuration must be set before initialization.");
-    }
-    _dataFetchThreadCount = _configuration.getInt(BLUR_SHARD_DATA_FETCH_THREAD_COUNT, 8);
-    _maxQueryCacheElements = _configuration.getInt(BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS, 128);
-    _maxTimeToLive = _configuration.getLong(BLUR_SHARD_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1));
-  }
-
-  @Override
-  public TableStats tableStats(String table) throws BlurException, TException {
-    checkTable(table);
-    try {
-      TableStats tableStats = new TableStats();
-      // tableStats.tableName = table;
-      // tableStats.recordCount = _indexServer.getRecordCount(table);
-      // tableStats.rowCount = _indexServer.getRowCount(table);
-      tableStats.bytes = _indexServer.getTableSize(table);
-      tableStats.queries = 0;
-      return tableStats;
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to get table stats [table={0}]", e, table);
-      throw new BException(e.getMessage(), e);
-    }
-  }
-
-  public synchronized void close() {
-    if (!_closed) {
-      _closed = true;
-      _indexManager.close();
-      _dataFetch.shutdownNow();
-    }
-  }
-
-  public long getMaxTimeToLive() {
-    return _maxTimeToLive;
-  }
-
-  public void setMaxTimeToLive(long maxTimeToLive) {
-    _maxTimeToLive = maxTimeToLive;
-  }
-
-  public int getMaxQueryCacheElements() {
-    return _maxQueryCacheElements;
-  }
-
-  public void setMaxQueryCacheElements(int maxQueryCacheElements) {
-    _maxQueryCacheElements = maxQueryCacheElements;
-  }
-
-  public void setIndexManager(IndexManager indexManager) {
-    _indexManager = indexManager;
-  }
-
-  public void setIndexServer(IndexServer indexServer) {
-    _indexServer = indexServer;
-  }
-
-  public int getDataFetchThreadCount() {
-    return _dataFetchThreadCount;
-  }
-
-  public void setDataFetchThreadCount(int dataFetchThreadCount) {
-    _dataFetchThreadCount = dataFetchThreadCount;
-  }
-
-  public void setConfiguration(BlurConfiguration conf) {
-    _configuration = conf;
-  }
-
-  // New interface from this point
-
-  private Map<String, SessionInfo> sessions = new ConcurrentHashMap<String, SessionInfo>();
-
-  @Override
-  public Session openReadSession(String table) throws BlurException, TException {
-    String uuid = UUID.randomUUID().toString();
-    return newSession(table, uuid);
-  }
-
-  private Session newSession(String table, String uuid) throws BlurException {
-    checkTable(table);
-    BlurAnalyzer analyzer = _indexServer.getAnalyzer(table);
-    Map<String, BlurIndex> blurIndexes;
-    try {
-      blurIndexes = _indexServer.getIndexes(table);
-    } catch (IOException e) {
-      LOG.error("Unknown error while trying to fetch index readers.", e);
-      throw new BException(e.getMessage(), e);
-    }
-    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, table);
-    SessionInfo sessionInfo = new SessionInfo();
-    sessionInfo.setUuid(uuid);
-    sessionInfo.setAnalyzer(analyzer);
-    sessionInfo.setTableDescriptor(tableDescriptor);
-
-    for (Entry<String, BlurIndex> entry : blurIndexes.entrySet()) {
-      int index = BlurUtil.getShardIndex(entry.getKey());
-      try {
-        IndexReader indexReader = entry.getValue().getIndexReader();
-        // @TODO use new thread pool here
-        IndexSearcher indexSearcher = new IndexSearcher(indexReader, _searchExecutor);
-        sessionInfo.add(index, indexReader);
-        sessionInfo.add(index, indexSearcher);
-      } catch (IOException e) {
-        LOG.error("Unknown error while trying to fetch index readers.", e);
-      }
-    }
-    sessions.put(uuid, sessionInfo);
-    return new Session(uuid, table);
-  }
-
-  @Override
-  public List<TopFieldDocs> search(final Session session, final QueryArgs queryArgs) throws BlurException, TException {
-    SessionInfo info = getSessionInfo(session);
-    if (info == null) {
-      newSession(session.getTableName(), session.getSessionId());
-      info = getSessionInfo(session);
-    }
-    try {
-      Map<Integer, IndexSearcher> searchers = info.getSearchers();
-      List<Integer> shardIndexes = queryArgs.getShardIndexes();
-      TableDescriptor tableDescriptor = info.getTableDescriptor();
-      Collection<SearchAction> searchersToSearch = getSearchActions(tableDescriptor, shardIndexes, searchers);
-
-      List<Future<TopFieldDocs>> futures = new ArrayList<Future<TopFieldDocs>>(searchersToSearch.size());
-      Query query = ThriftLuceneConversion.toLuceneQuery(queryArgs.query);
-      Filter filter = ThriftLuceneConversion.toLuceneFilter(queryArgs);
-      Sort sort = ThriftLuceneConversion.toLuceneSort(queryArgs);
-      ScoreDoc after = ThriftLuceneConversion.toLucene(queryArgs.getAfter());
-      boolean doDocScores = queryArgs.isDoDocScores();
-      boolean doMaxScore = queryArgs.isDoMaxScore();
-      int numberToFetch = queryArgs.getNumberToFetch();
-      for (SearchAction action : searchersToSearch) {
-        final int shardIndex = action.shardIndex;
-        if (action.type == ACTION.LOCAL) {
-          SearchCallable searchCallable = new SearchCallable(shardIndex, action.indexSearcher, after, query, filter, sort, numberToFetch, doDocScores, doMaxScore);
-          Future<TopFieldDocs> future = _indexSearcherExecutor.submit(searchCallable);
-          futures.add(future);
-        } else if (action.type == ACTION.REMOTE) {
-          // @TODO need to send only one call per server, instead of one for
-          // each shard server
-          final Connection connection = action.remoteServer;
-          Future<TopFieldDocs> future = _indexSearcherExecutor.submit(new Callable<TopFieldDocs>() {
-            @Override
-            public TopFieldDocs call() throws Exception {
-              List<TopFieldDocs> list = BlurClientManager.execute(connection, new BlurCommand<List<TopFieldDocs>>() {
-                @Override
-                public List<TopFieldDocs> call(Client client) throws BlurException, TException {
-                  QueryArgs remoteQueryArgs = new QueryArgs(queryArgs);
-                  remoteQueryArgs.addToShardIndexes(shardIndex);
-                  return client.search(session, remoteQueryArgs);
-                }
-              });
-              return list.iterator().next();
-            }
-          });
-          futures.add(future);
-        }
-      }
-
-      List<TopFieldDocs> result = new ArrayList<TopFieldDocs>(futures.size());
-      for (Future<TopFieldDocs> future : futures) {
-        result.add(future.get());
-      }
-      return result;
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-
-  static class SearchCallable implements Callable<TopFieldDocs> {
-    private final ScoreDoc after;
-    private final Sort sort;
-    private final Filter filter;
-    private final Query query;
-    private final IndexSearcher searcher;
-    private final int count;
-    private final boolean doDocScores;
-    private final boolean doMaxScore;
-    private final int shardIndex;
-
-    SearchCallable(int shardIndex, IndexSearcher searcher, ScoreDoc after, Query query, Filter filter, Sort sort, int count, boolean doDocScores, boolean doMaxScore) {
-      this.after = after;
-      this.searcher = searcher;
-      this.query = query;
-      this.filter = filter;
-      this.sort = sort;
-      this.count = count;
-      this.doDocScores = doDocScores;
-      this.doMaxScore = doMaxScore;
-      this.shardIndex = shardIndex;
-    }
-
-    @Override
-    public TopFieldDocs call() throws Exception {
-      return addShardIndex(doSearch());
-    }
-
-    private TopFieldDocs addShardIndex(TopFieldDocs topFieldDocs) {
-      topFieldDocs.setShardIndex(shardIndex);
-      return topFieldDocs;
-    }
-
-    private TopFieldDocs doSearch() throws IOException {
-      if (after == null) {
-        if (sort == null) {
-          return toThrift(setShardIndexTopDocs(shardIndex, searcher.search(query, filter, count)));
-        } else {
-          return toThrift(setShardIndexTopFieldDocs(shardIndex, searcher.search(query, filter, count, sort, doDocScores, doMaxScore)));
-        }
-      } else {
-        if (sort == null) {
-          return toThrift(setShardIndexTopDocs(shardIndex, searcher.searchAfter(after, query, filter, count)));
-        } else {
-          return toThrift(setShardIndexTopFieldDocs(shardIndex,
-              (org.apache.lucene.search.TopFieldDocs) searcher.searchAfter(after, query, filter, count, sort, doDocScores, doMaxScore)));
-        }
-      }
-    }
-  }
-
-  static class SearchAction {
-    enum ACTION {
-      LOCAL, REMOTE
-    }
-
-    ACTION type;
-
-    SearchAction(int shardIndex, IndexSearcher indexSearcher) {
-      this.type = ACTION.LOCAL;
-      this.shardIndex = shardIndex;
-      this.indexSearcher = indexSearcher;
-    }
-
-    SearchAction(int shardIndex, Connection remoteServer) {
-      this.type = ACTION.REMOTE;
-      this.shardIndex = shardIndex;
-      this.remoteServer = remoteServer;
-    }
-
-    int shardIndex;
-    IndexSearcher indexSearcher;
-    Connection remoteServer;
-  }
-
-  private Collection<SearchAction> getSearchActions(TableDescriptor tableDescriptor, List<Integer> shardIndexes, Map<Integer, IndexSearcher> searchers) throws BlurException {
-    String name = tableDescriptor.getName();
-    int shardCount = tableDescriptor.getShardCount();
-    Collection<SearchAction> searchersToSearch = new ArrayList<SearchAction>();
-    if (shardIndexes == null) {
-      shardIndexes = new ArrayList<Integer>(shardCount);
-      // all indexes
-      for (int i = 0; i < shardCount; i++) {
-        shardIndexes.add(i);
-      }
-    }
-    for (Integer index : shardIndexes) {
-      IndexSearcher searcher = searchers.get(index);
-      if (searcher != null) {
-        searchersToSearch.add(new SearchAction(index, searcher));
-      } else {
-        searchersToSearch.add(new SearchAction(index, getConnection(name, index)));
-      }
-    }
-    return searchersToSearch;
-  }
-
-  private SessionInfo getSessionInfo(Session session) {
-    return sessions.get(session.getSessionId());
-  }
-
-  @Override
-  public List<Document> doc(Session session, List<Long> docLocations, Set<String> fieldsToLoad) throws BlurException, TException {
-    try {
-      SessionInfo sessionInfo = getSessionInfo(session);
-      Map<Integer, IndexSearcher> searchers = sessionInfo.getSearchers();
-      List<Document> result = new ArrayList<Document>();
-      for (Long docLocation : docLocations) {
-        if (docLocation == null) {
-          throw new BlurException("Null docLocation is not allowed.", null);
-        }
-        int shardIndex = BlurUtil.getShardIndex(docLocation);
-        int docId = BlurUtil.getDocumentId(docLocation);
-        IndexSearcher searcher = searchers.get(shardIndex);
-        if (searcher == null) {
-          result.addAll(forwardDoc(session, shardIndex, docLocation, fieldsToLoad));
-        } else {
-          result.add(toThrift(searcher.document(docId, fieldsToLoad)));
-        }
-      }
-      return result;
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-
-  private List<Document> forwardDoc(final Session session, int shardIndex, final Long docLocation, final Set<String> fieldsToLoad) throws BlurException, TException, IOException {
-    // TODO Make more efficient by making a single call to a server for many
-    // docLocations
-    String table = session.getTableName();
-    System.out.println(table + " " + shardIndex);
-    Connection connection = getConnection(table, shardIndex);
-    return BlurClientManager.execute(connection, new BlurCommand<List<Document>>() {
-      @Override
-      public List<Document> call(Client client) throws BlurException, TException {
-        return client.doc(session, Arrays.asList(docLocation), fieldsToLoad);
-      }
-    });
-  }
-
-  @Override
-  public void closeReadSession(Session session) throws BlurException, TException {
-    SessionInfo sessionInfo = getSessionInfo(session);
-    sessionInfo.releaseReaders();
-  }
-
-  @Override
-  public List<Generation> addDocuments(MutateOptions options, List<Document> documents) throws BlurException, TException {
-    String table = options.getTable();
-    int shardIndex = options.getShardIndex();
-    boolean waitToBeVisible = options.isWaitToBeVisible();
-    boolean writeAheadLog = options.isWriteAheadLog();
-    List<Generation> generations = new ArrayList<Generation>();
-    try {
-      BlurIndex index = getIndex(table, shardIndex);
-      if (index == null) {
-        generations.addAll(forwardAddDocuments(options, documents));
-      } else {
-        long generation = index.addDocuments(waitToBeVisible, writeAheadLog, documents);
-        generations.add(new Generation(table, shardIndex, generation));
-      }
-      return generations;
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-
-  @Override
-  public void blockUntilGenerationIsVisible(List<Generation> generations, boolean forceRefresh) throws BlurException, TException {
-    try {
-      for (Generation generation : generations) {
-        String table = generation.getTable();
-        int shardIndex = generation.getShardIndex();
-        BlurIndex index = getIndex(table, shardIndex);
-        if (index == null) {
-          forwardBlockUntilGenerationIsVisible(generation, forceRefresh);
-        } else {
-          index.blockUntilGenerationIsVisible(generation.getGeneration(), forceRefresh);
-        }
-      }
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-
-  private void forwardBlockUntilGenerationIsVisible(final Generation generation, final boolean forceRefresh) throws BlurException, TException, IOException {
-    String table = generation.getTable();
-    int shardIndex = generation.getShardIndex();
-    Connection connection = getConnection(table, shardIndex);
-    BlurClientManager.execute(connection, new BlurCommand<Void>() {
-      @Override
-      public Void call(Client client) throws BlurException, TException {
-        client.blockUntilGenerationIsVisible(Arrays.asList(generation), forceRefresh);
-        return null;
-      }
-    });
-  }
-
-  @Override
-  public List<Generation> deleteDocumentsByQueries(MutateOptions options, List<ByteBuffer> queries) throws BlurException, TException {
-    String table = options.getTable();
-    int shardIndex = options.getShardIndex();
-    boolean waitToBeVisible = options.isWaitToBeVisible();
-    boolean writeAheadLog = options.isWriteAheadLog();
-    List<Generation> generations = new ArrayList<Generation>();
-    try {
-      BlurIndex index = getIndex(table, shardIndex);
-      if (index == null) {
-        generations.addAll(forwardDeleteDocumentsByQueries(options, queries));
-      } else {
-        long generation = index.deleteDocuments(waitToBeVisible, writeAheadLog, queries.toArray(new ByteBuffer[queries.size()]));
-        generations.add(new Generation(table, shardIndex, generation));
-      }
-      return generations;
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-
-  @Override
-  public List<Generation> deleteDocuments(MutateOptions options, List<Term> terms) throws BlurException, TException {
-    String table = options.getTable();
-    int shardIndex = options.getShardIndex();
-    boolean waitToBeVisible = options.isWaitToBeVisible();
-    boolean writeAheadLog = options.isWriteAheadLog();
-    List<Generation> generations = new ArrayList<Generation>();
-    try {
-      BlurIndex index = getIndex(table, shardIndex);
-      if (index == null) {
-        generations.addAll(forwardDeleteDocuments(options, terms));
-      } else {
-        long generation = index.deleteDocuments(waitToBeVisible, writeAheadLog, terms.toArray(new Term[terms.size()]));
-        generations.add(new Generation(table, shardIndex, generation));
-      }
-      return generations;
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-
-  @Override
-  public List<Generation> updateDocuments(MutateOptions options, List<UpdatePackage> updatePackages) throws BlurException, TException {
-    String table = options.getTable();
-    int shardIndex = options.getShardIndex();
-    boolean waitToBeVisible = options.isWaitToBeVisible();
-    boolean writeAheadLog = options.isWriteAheadLog();
-    List<Generation> generations = new ArrayList<Generation>();
-    try {
-      BlurIndex index = getIndex(table, shardIndex);
-      if (index == null) {
-        generations.addAll(forwardUpdateDocuments(options, updatePackages));
-      } else {
-        long generation = index.updateDocuments(waitToBeVisible, writeAheadLog, updatePackages);
-        generations.add(new Generation(table, shardIndex, generation));
-      }
-      return generations;
-    } catch (Throwable t) {
-      LOG.error("Unknown error", t);
-      throw new BException(t.getMessage(), t);
-    }
-  }
-
-  private BlurIndex getIndex(String table, int shardIndex) throws BException {
-    Map<String, BlurIndex> blurIndexes;
-    try {
-      blurIndexes = _indexServer.getIndexes(table);
-    } catch (IOException e) {
-      LOG.error("Unknown error while trying to fetch index readers.", e);
-      throw new BException(e.getMessage(), e);
-    }
-    BlurIndex index = blurIndexes.get(BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardIndex));
-    return index;
-  }
-
-  @Override
-  public List<Integer> serverLayout(String table, String server) throws BlurException, TException {
-    throw new BlurException("Not implemented", null);
-  }
-
-  @Override
-  public void cancelQuery(Session session, long id) throws BlurException, TException {
-    throw new BlurException("Not implemented", null);
-  }
-
-  @Override
-  public List<Long> queryStatusIdList(Session session) throws BlurException, TException {
-    throw new BlurException("Not implemented", null);
-  }
-
-  @Override
-  public QueryStatus queryStatus(Session session, long id) throws BlurException, TException {
-    throw new BlurException("Not implemented", null);
-  }
-
-  @Override
-  public LiveSchema schema(String table) throws BlurException, TException {
-    throw new BlurException("Not implemented", null);
-  }
-
-  public TableLayout getLayout() {
-    return _layout;
-  }
-
-  public void setLayout(TableLayout layout) {
-    this._layout = layout;
-  }
-
-  private List<Generation> forwardAddDocuments(final MutateOptions options, final List<Document> documents) throws BlurException, TException, IOException {
-    String table = options.getTable();
-    int shardIndex = options.getShardIndex();
-    Connection connection = getConnection(table, shardIndex);
-    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
-      @Override
-      public List<Generation> call(Client client) throws BlurException, TException {
-        return client.addDocuments(options, documents);
-      }
-    });
-  }
-
-  private List<Generation> forwardUpdateDocuments(final MutateOptions options, final List<UpdatePackage> updatePackages) throws BlurException, TException, IOException {
-    String table = options.getTable();
-    int shardIndex = options.getShardIndex();
-    Connection connection = getConnection(table, shardIndex);
-    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
-      @Override
-      public List<Generation> call(Client client) throws BlurException, TException {
-        return client.updateDocuments(options, updatePackages);
-      }
-    });
-  }
-
-  private List<Generation> forwardDeleteDocumentsByQueries(final MutateOptions options, final List<ByteBuffer> queries) throws BlurException, TException, IOException {
-    String table = options.getTable();
-    int shardIndex = options.getShardIndex();
-    Connection connection = getConnection(table, shardIndex);
-    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
-      @Override
-      public List<Generation> call(Client client) throws BlurException, TException {
-        return client.deleteDocumentsByQueries(options, queries);
-      }
-    });
-  }
-
-  private List<Generation> forwardDeleteDocuments(final MutateOptions options, final List<Term> terms) throws BlurException, TException, IOException {
-    String table = options.getTable();
-    int shardIndex = options.getShardIndex();
-    Connection connection = getConnection(table, shardIndex);
-    return BlurClientManager.execute(connection, new BlurCommand<List<Generation>>() {
-      @Override
-      public List<Generation> call(Client client) throws BlurException, TException {
-        return client.deleteDocuments(options, terms);
-      }
-    });
-  }
-
-  private Connection getConnection(String table, int shardIndex) {
-    String server = _layout.findServer(table, shardIndex, TYPE.WRITABLE);
-    return new Connection(server);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ed671149/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
new file mode 100644
index 0000000..23d3d46
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
@@ -0,0 +1,292 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER;
+import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER_NAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_MAX_CLAUSE_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_ADDRESS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_SLAB_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_FILTER_CACHE_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_HOSTNAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_WARMUP_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_OPENER_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SAFEMODEDELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE;
+import static org.apache.blur.utils.BlurUtil.quietClose;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.concurrent.SimpleUncaughtExceptionHandler;
+import org.apache.blur.concurrent.ThreadWatcher;
+import org.apache.blur.gui.HttpJettyServer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.BlurFilterCache;
+import org.apache.blur.manager.DefaultBlurFilterCache;
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.manager.indexserver.BlurIndexWarmup;
+import org.apache.blur.manager.indexserver.BlurServerShutDown;
+import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
+import org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup;
+import org.apache.blur.manager.indexserver.DistributedIndexServer;
+import org.apache.blur.manager.indexserver.DistributedLayoutManager;
+import org.apache.blur.manager.writer.BlurIndexRefresher;
+import org.apache.blur.metrics.BlurMetrics;
+import org.apache.blur.store.blockcache.BlockCache;
+import org.apache.blur.store.blockcache.BlockDirectory;
+import org.apache.blur.store.blockcache.BlockDirectoryCache;
+import org.apache.blur.store.blockcache.Cache;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.Blur.Processor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.zookeeper.ZkUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
+
+public class ThriftBlurServer extends ThriftServer {
+
+  private static final Log LOG = LogFactory.getLog(ThriftBlurServer.class);
+
+  public static void main(String[] args) throws Exception {
+    int serverIndex = getServerIndex(args);
+    LOG.info("Setting up Shard Server");
+
+    Thread.setDefaultUncaughtExceptionHandler(new SimpleUncaughtExceptionHandler());
+    BlurConfiguration configuration = new BlurConfiguration();
+
+    ThriftServer server = createServer(serverIndex, configuration);
+    server.start();
+  }
+
+  public static ThriftServer createServer(int serverIndex, BlurConfiguration configuration) throws Exception {
+    // setup block cache
+    // 134,217,728 is the slab size, therefore there are 16,384 blocks
+    // in a slab when using a block size of 8,192
+    int numberOfBlocksPerSlab = 16384;
+    int blockSize = BlockDirectory.BLOCK_SIZE;
+    int slabCount = configuration.getInt(BLUR_SHARD_BLOCKCACHE_SLAB_COUNT, 1);
+    Cache cache;
+    Configuration config = new Configuration();
+
+    String bindAddress = configuration.get(BLUR_SHARD_BIND_ADDRESS);
+    int bindPort = configuration.getInt(BLUR_SHARD_BIND_PORT, -1);
+    bindPort += serverIndex;
+
+    BlurMetrics blurMetrics = new BlurMetrics(config);
+
+    int baseGuiPort = Integer.parseInt(configuration.get(BLUR_GUI_SHARD_PORT));
+    final HttpJettyServer httpServer;
+    if (baseGuiPort > 0) {
+      int webServerPort = baseGuiPort + serverIndex;
+
+      // TODO: this got ugly, there has to be a better way to handle all these
+      // params
+      // without reversing the mvn dependancy and making blur-gui on top.
+      httpServer = new HttpJettyServer(bindPort, webServerPort, configuration.getInt(BLUR_CONTROLLER_BIND_PORT, -1), configuration.getInt(BLUR_SHARD_BIND_PORT, -1),
+          configuration.getInt(BLUR_GUI_CONTROLLER_PORT, -1), configuration.getInt(BLUR_GUI_SHARD_PORT, -1), "shard", blurMetrics, null);
+    } else {
+      httpServer = null;
+    }
+
+    if (slabCount >= 1) {
+      BlockCache blockCache;
+      boolean directAllocation = configuration.getBoolean(BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION, true);
+
+      int slabSize = numberOfBlocksPerSlab * blockSize;
+      LOG.info("Number of slabs of block cache [{0}] with direct memory allocation set to [{1}]", slabCount, directAllocation);
+      LOG.info("Block cache target memory usage, slab size of [{0}] will allocate [{1}] slabs and use ~[{2}] bytes", slabSize, slabCount, ((long) slabCount * (long) slabSize));
+
+      BufferStore.init(configuration, blurMetrics);
+
+      try {
+        long totalMemory = (long) slabCount * (long) numberOfBlocksPerSlab * (long) blockSize;
+        blockCache = new BlockCache(directAllocation, totalMemory, slabSize);
+      } catch (OutOfMemoryError e) {
+        if ("Direct buffer memory".equals(e.getMessage())) {
+          System.err
+              .println("The max direct memory is too low.  Either increase by setting (-XX:MaxDirectMemorySize=<size>g -XX:+UseLargePages) or disable direct allocation by (blur.shard.blockcache.direct.memory.allocation=false) in blur-site.properties");
+          System.exit(1);
+        }
+        throw e;
+      }
+      cache = new BlockDirectoryCache(blockCache);
+    } else {
+      cache = BlockDirectory.NO_CACHE;
+    }
+
+    LOG.info("Shard Server using index [{0}] bind address [{1}]", serverIndex, bindAddress + ":" + bindPort);
+
+    String nodeNameHostName = getNodeName(configuration, BLUR_SHARD_HOSTNAME);
+    String nodeName = nodeNameHostName + ":" + bindPort;
+    String zkConnectionStr = isEmpty(configuration.get(BLUR_ZOOKEEPER_CONNECTION), BLUR_ZOOKEEPER_CONNECTION);
+
+    final ZooKeeper zooKeeper = ZkUtils.newZooKeeper(zkConnectionStr);
+    try {
+      ZookeeperSystemTime.checkSystemTime(zooKeeper, configuration.getLong(BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE, 3000));
+    } catch (KeeperException e) {
+      if (e.code() == Code.CONNECTIONLOSS) {
+        System.err.println("Cannot connect zookeeper to [" + zkConnectionStr + "]");
+        System.exit(1);
+      }
+    }
+
+    BlurUtil.setupZookeeper(zooKeeper, configuration.get(BLUR_CLUSTER_NAME));
+
+    final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(BlurConstants.BLUR_CLUSTER, zooKeeper);
+
+    final BlurIndexRefresher refresher = new BlurIndexRefresher();
+    refresher.init();
+
+    BlurFilterCache filterCache = getFilterCache(configuration);
+    BlurIndexWarmup indexWarmup = getIndexWarmup(configuration);
+    IndexDeletionPolicy indexDeletionPolicy = new KeepOnlyLastCommitDeletionPolicy();
+
+    final DistributedIndexServer indexServer = new DistributedIndexServer();
+    indexServer.setBlurMetrics(blurMetrics);
+    indexServer.setCache(cache);
+    indexServer.setClusterStatus(clusterStatus);
+    indexServer.setClusterName(configuration.get(BLUR_CLUSTER_NAME, BLUR_CLUSTER));
+    indexServer.setConfiguration(config);
+    indexServer.setNodeName(nodeName);
+    indexServer.setRefresher(refresher);
+    indexServer.setShardOpenerThreadCount(configuration.getInt(BLUR_SHARD_OPENER_THREAD_COUNT, 16));
+    indexServer.setZookeeper(zooKeeper);
+    indexServer.setFilterCache(filterCache);
+    indexServer.setSafeModeDelay(configuration.getLong(BLUR_SHARD_SAFEMODEDELAY, 60000));
+    indexServer.setWarmup(indexWarmup);
+    indexServer.setIndexDeletionPolicy(indexDeletionPolicy);
+    indexServer.setTimeBetweenCommits(configuration.getLong(BLUR_SHARD_TIME_BETWEEN_COMMITS, 60000));
+    indexServer.setTimeBetweenRefreshs(configuration.getLong(BLUR_SHARD_TIME_BETWEEN_REFRESHS, 500));
+    indexServer.init();
+
+    final IndexManager indexManager = new IndexManager();
+    indexManager.setIndexServer(indexServer);
+    indexManager.setMaxClauseCount(configuration.getInt(BLUR_MAX_CLAUSE_COUNT, 1024));
+    indexManager.setThreadCount(configuration.getInt(BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT, 32));
+    indexManager.setBlurMetrics(blurMetrics);
+    indexManager.setFilterCache(filterCache);
+    indexManager.init();
+
+    TableLayout layout = new TableLayout() {
+      @Override
+      public String findServer(String table, int shard, TYPE type) {
+        DistributedLayoutManager manager = new DistributedLayoutManager();
+        List<String> onlineServers = clusterStatus.getOnlineServers(true);
+        List<String> offlineServers = clusterStatus.getOfflineServers(true);
+        manager.setNodes(onlineServers);
+        manager.setNodesOffline(offlineServers);
+        manager.setShards(getShardList(clusterStatus.getTableDescriptor(true, table).getShardCount()));
+        manager.init();
+
+        Map<String, String> map = manager.getLayout();
+        String server = map.get(BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shard));
+        return server;
+      }
+
+      private Collection<String> getShardList(int shardCount) {
+        List<String> list = new ArrayList<String>();
+        for (int i = 0; i < shardCount; i++) {
+          list.add(BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i));
+        }
+        return list;
+      }
+    };
+
+    final BlurServer shardServer = new BlurServer();
+    shardServer.setIndexServer(indexServer);
+    shardServer.setIndexManager(indexManager);
+    shardServer.setZookeeper(zooKeeper);
+    shardServer.setClusterStatus(clusterStatus);
+    shardServer.setConfiguration(configuration);
+    shardServer.setLayout(layout);
+    shardServer.init();
+
+    Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(blurMetrics, shardServer, Iface.class);
+
+    int threadCount = configuration.getInt(BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT, 32);
+
+    Processor<Iface> processor = new Processor<Iface>(iface);
+
+    final ThriftBlurServer server = new ThriftBlurServer();
+    server.setNodeName(nodeName);
+    server.setBindAddress(bindAddress);
+    server.setBindPort(bindPort);
+    server.setThreadCount(threadCount);
+    server.setConfiguration(configuration);
+    server.setProcessor(processor);
+
+    // This will shutdown the server when the correct path is set in zk
+    BlurShutdown shutdown = new BlurShutdown() {
+      @Override
+      public void shutdown() {
+        ThreadWatcher threadWatcher = ThreadWatcher.instance();
+        quietClose(refresher, server, shardServer, indexManager, indexServer, threadWatcher, clusterStatus, zooKeeper, httpServer);
+      }
+    };
+    server.setShutdown(shutdown);
+    new BlurServerShutDown().register(shutdown, zooKeeper);
+    return server;
+  }
+
+  private static BlurFilterCache getFilterCache(BlurConfiguration configuration) {
+    String _blurFilterCacheClass = configuration.get(BLUR_SHARD_FILTER_CACHE_CLASS);
+    if (_blurFilterCacheClass != null) {
+      try {
+        Class<?> clazz = Class.forName(_blurFilterCacheClass);
+        return (BlurFilterCache) clazz.newInstance();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return new DefaultBlurFilterCache();
+  }
+
+  private static BlurIndexWarmup getIndexWarmup(BlurConfiguration configuration) {
+    String _blurFilterCacheClass = configuration.get(BLUR_SHARD_INDEX_WARMUP_CLASS);
+    if (_blurFilterCacheClass != null) {
+      try {
+        Class<?> clazz = Class.forName(_blurFilterCacheClass);
+        return (BlurIndexWarmup) clazz.newInstance();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return new DefaultBlurIndexWarmup();
+  }
+}


Mime
View raw message