incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [13/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
new file mode 100644
index 0000000..8bc7b99
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -0,0 +1,160 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_ADDRESS;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_CACHE_MAX_QUERYCACHE_ELEMENTS;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_CACHE_MAX_TIMETOLIVE;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_HOSTNAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_REMOTE_FETCH_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_DEFAULT_DELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_FETCH_DELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MAX_DEFAULT_DELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MAX_DEFAULT_RETRIES;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MAX_FETCH_DELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MAX_FETCH_RETRIES;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MAX_MUTATE_DELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MAX_MUTATE_RETRIES;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_MUTATE_DELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_SERVER_REMOTE_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE;
+import static org.apache.blur.utils.BlurUtil.quietClose;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.concurrent.SimpleUncaughtExceptionHandler;
+import org.apache.blur.concurrent.ThreadWatcher;
+import org.apache.blur.gui.HttpJettyServer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.BlurQueryChecker;
+import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.manager.indexserver.BlurServerShutDown;
+import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
+import org.apache.blur.metrics.BlurMetrics;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.zookeeper.ZkUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class ThriftBlurControllerServer extends ThriftServer {
+
+  private static final Log LOG = LogFactory.getLog(ThriftBlurControllerServer.class);
+
+  public static void main(String[] args) throws Exception {
+    int serverIndex = getServerIndex(args);
+    LOG.info("Setting up Controller Server");
+    BlurConfiguration configuration = new BlurConfiguration();
+    ThriftServer server = createServer(serverIndex, configuration);
+    server.start();
+  }
+
+  public static ThriftServer createServer(int serverIndex, BlurConfiguration configuration) throws Exception {
+    Thread.setDefaultUncaughtExceptionHandler(new SimpleUncaughtExceptionHandler());
+    String bindAddress = configuration.get(BLUR_CONTROLLER_BIND_ADDRESS);
+    int bindPort = configuration.getInt(BLUR_CONTROLLER_BIND_PORT, -1);
+    bindPort += serverIndex;
+
+    LOG.info("Shard Server using index [{0}] bind address [{1}]", serverIndex, bindAddress + ":" + bindPort);
+
+    Configuration config = new Configuration();
+    BlurMetrics blurMetrics = new BlurMetrics(config);
+
+    String nodeName = ThriftBlurShardServer.getNodeName(configuration, BLUR_CONTROLLER_HOSTNAME);
+    nodeName = nodeName + ":" + bindPort;
+    String zkConnectionStr = isEmpty(configuration.get(BLUR_ZOOKEEPER_CONNECTION), BLUR_ZOOKEEPER_CONNECTION);
+
+    BlurQueryChecker queryChecker = new BlurQueryChecker(configuration);
+
+    final ZooKeeper zooKeeper = ZkUtils.newZooKeeper(zkConnectionStr);
+    ZookeeperSystemTime.checkSystemTime(zooKeeper, configuration.getLong(BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE, 3000));
+
+    BlurUtil.setupZookeeper(zooKeeper);
+
+    final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(zooKeeper);
+
+    BlurControllerServer.BlurClient client = new BlurControllerServer.BlurClientRemote();
+
+    final BlurControllerServer controllerServer = new BlurControllerServer();
+    controllerServer.setClient(client);
+    controllerServer.setClusterStatus(clusterStatus);
+    controllerServer.setZookeeper(zooKeeper);
+    controllerServer.setNodeName(nodeName);
+    controllerServer.setRemoteFetchCount(configuration.getInt(BLUR_CONTROLLER_REMOTE_FETCH_COUNT, 100));
+    controllerServer.setMaxQueryCacheElements(configuration.getInt(BLUR_CONTROLLER_CACHE_MAX_QUERYCACHE_ELEMENTS, 128));
+    controllerServer.setMaxTimeToLive(configuration.getLong(BLUR_CONTROLLER_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1)));
+    controllerServer.setQueryChecker(queryChecker);
+    controllerServer.setThreadCount(configuration.getInt(BLUR_CONTROLLER_SERVER_REMOTE_THREAD_COUNT, 64));
+    controllerServer.setMaxFetchRetries(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_FETCH_RETRIES, 3));
+    controllerServer.setMaxMutateRetries(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_MUTATE_RETRIES, 3));
+    controllerServer.setMaxDefaultRetries(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_DEFAULT_RETRIES, 3));
+    controllerServer.setFetchDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_FETCH_DELAY, 500));
+    controllerServer.setMutateDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MUTATE_DELAY, 500));
+    controllerServer.setDefaultDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_DEFAULT_DELAY, 500));
+    controllerServer.setMaxFetchDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_FETCH_DELAY, 2000));
+    controllerServer.setMaxMutateDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_MUTATE_DELAY, 2000));
+    controllerServer.setMaxDefaultDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_DEFAULT_DELAY, 2000));
+
+    controllerServer.init();
+
+    Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(blurMetrics, controllerServer, Iface.class);
+
+    int threadCount = configuration.getInt(BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT, 32);
+
+    final ThriftBlurControllerServer server = new ThriftBlurControllerServer();
+    server.setNodeName(nodeName);
+    server.setConfiguration(configuration);
+    server.setBindAddress(bindAddress);
+    server.setBindPort(bindPort);
+    server.setThreadCount(threadCount);
+    server.setIface(iface);
+
+    int baseGuiPort = Integer.parseInt(configuration.get(BLUR_GUI_CONTROLLER_PORT));
+    final HttpJettyServer httpServer;
+    if (baseGuiPort > 0) {
+      int webServerPort = baseGuiPort + serverIndex;
+      // TODO: this got ugly, there has to be a better way to handle all these
+      // params
+      // without reversing the mvn dependancy and making blur-gui on top.
+      httpServer = new HttpJettyServer(bindPort, webServerPort, configuration.getInt(BLUR_CONTROLLER_BIND_PORT, -1), configuration.getInt(BLUR_SHARD_BIND_PORT, -1),
+          configuration.getInt(BLUR_GUI_CONTROLLER_PORT, -1), configuration.getInt(BLUR_GUI_SHARD_PORT, -1), "controller", blurMetrics);
+    } else {
+      httpServer = null;
+    }
+
+    // This will shutdown the server when the correct path is set in zk
+    BlurShutdown shutdown = new BlurShutdown() {
+      @Override
+      public void shutdown() {
+        ThreadWatcher threadWatcher = ThreadWatcher.instance();
+        quietClose(server, controllerServer, clusterStatus, zooKeeper, threadWatcher, httpServer);
+      }
+    };
+    server.setShutdown(shutdown);
+    new BlurServerShutDown().register(shutdown, zooKeeper);
+    return server;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
new file mode 100644
index 0000000..d92c394
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -0,0 +1,264 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER_NAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_MAX_CLAUSE_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_ADDRESS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_SLAB_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_TIMETOLIVE;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DATA_FETCH_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_FILTER_CACHE_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_HOSTNAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_WARMUP_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_OPENER_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SAFEMODEDELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE;
+import static org.apache.blur.utils.BlurUtil.quietClose;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.concurrent.SimpleUncaughtExceptionHandler;
+import org.apache.blur.concurrent.ThreadWatcher;
+import org.apache.blur.gui.HttpJettyServer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.BlurFilterCache;
+import org.apache.blur.manager.BlurQueryChecker;
+import org.apache.blur.manager.DefaultBlurFilterCache;
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.manager.indexserver.BlurIndexWarmup;
+import org.apache.blur.manager.indexserver.BlurServerShutDown;
+import org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup;
+import org.apache.blur.manager.indexserver.DistributedIndexServer;
+import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
+import org.apache.blur.manager.writer.BlurIndexRefresher;
+import org.apache.blur.metrics.BlurMetrics;
+import org.apache.blur.store.BufferStore;
+import org.apache.blur.store.blockcache.BlockCache;
+import org.apache.blur.store.blockcache.BlockDirectory;
+import org.apache.blur.store.blockcache.BlockDirectoryCache;
+import org.apache.blur.store.blockcache.Cache;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.zookeeper.ZkUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class ThriftBlurShardServer extends ThriftServer {
+
+  private static final Log LOG = LogFactory.getLog(ThriftBlurShardServer.class);
+
+  public static void main(String[] args) throws Exception {
+    int serverIndex = getServerIndex(args);
+    LOG.info("Setting up Shard Server");
+
+    Thread.setDefaultUncaughtExceptionHandler(new SimpleUncaughtExceptionHandler());
+    BlurConfiguration configuration = new BlurConfiguration();
+
+    ThriftServer server = createServer(serverIndex, configuration);
+    server.start();
+  }
+
+  public static ThriftServer createServer(int serverIndex, BlurConfiguration configuration) throws Exception {
+    // setup block cache
+    // 134,217,728 is the slab size, therefore there are 16,384 blocks
+    // in a slab when using a block size of 8,192
+    int numberOfBlocksPerSlab = 16384;
+    int blockSize = BlockDirectory.BLOCK_SIZE;
+    int slabCount = configuration.getInt(BLUR_SHARD_BLOCKCACHE_SLAB_COUNT, 1);
+    Cache cache;
+    Configuration config = new Configuration();
+    BlurMetrics blurMetrics = new BlurMetrics(config);
+    if (slabCount >= 1) {
+      BlockCache blockCache;
+      boolean directAllocation = configuration.getBoolean(BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION, true);
+
+      int slabSize = numberOfBlocksPerSlab * blockSize;
+      LOG.info("Number of slabs of block cache [{0}] with direct memory allocation set to [{1}]", slabCount, directAllocation);
+      LOG.info("Block cache target memory usage, slab size of [{0}] will allocate [{1}] slabs and use ~[{2}] bytes", slabSize, slabCount, ((long) slabCount * (long) slabSize));
+
+      BufferStore.init(configuration, blurMetrics);
+
+      try {
+        long totalMemory = (long) slabCount * (long) numberOfBlocksPerSlab * (long) blockSize;
+        blockCache = new BlockCache(blurMetrics, directAllocation, totalMemory, slabSize, blockSize);
+      } catch (OutOfMemoryError e) {
+        if ("Direct buffer memory".equals(e.getMessage())) {
+          System.err
+              .println("The max direct memory is too low.  Either increase by setting (-XX:MaxDirectMemorySize=<size>g -XX:+UseLargePages) or disable direct allocation by (blur.shard.blockcache.direct.memory.allocation=false) in blur-site.properties");
+          System.exit(1);
+        }
+        throw e;
+      }
+      cache = new BlockDirectoryCache(blockCache, blurMetrics);
+    } else {
+      cache = BlockDirectory.NO_CACHE;
+    }
+
+    String bindAddress = configuration.get(BLUR_SHARD_BIND_ADDRESS);
+    int bindPort = configuration.getInt(BLUR_SHARD_BIND_PORT, -1);
+    bindPort += serverIndex;
+
+    LOG.info("Shard Server using index [{0}] bind address [{1}]", serverIndex, bindAddress + ":" + bindPort);
+
+    String nodeNameHostName = getNodeName(configuration, BLUR_SHARD_HOSTNAME);
+    String nodeName = nodeNameHostName + ":" + bindPort;
+    String zkConnectionStr = isEmpty(configuration.get(BLUR_ZOOKEEPER_CONNECTION), BLUR_ZOOKEEPER_CONNECTION);
+
+    BlurQueryChecker queryChecker = new BlurQueryChecker(configuration);
+
+    final ZooKeeper zooKeeper = ZkUtils.newZooKeeper(zkConnectionStr);
+    try {
+      ZookeeperSystemTime.checkSystemTime(zooKeeper, configuration.getLong(BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE, 3000));
+    } catch (KeeperException e) {
+      if (e.code() == Code.CONNECTIONLOSS) {
+        System.err.println("Cannot connect zookeeper to [" + zkConnectionStr + "]");
+        System.exit(1);
+      }
+    }
+
+    BlurUtil.setupZookeeper(zooKeeper, configuration.get(BLUR_CLUSTER_NAME));
+
+    final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(zooKeeper);
+
+    final BlurIndexRefresher refresher = new BlurIndexRefresher();
+    refresher.init();
+
+    BlurFilterCache filterCache = getFilterCache(configuration);
+    BlurIndexWarmup indexWarmup = getIndexWarmup(configuration);
+    IndexDeletionPolicy indexDeletionPolicy = new KeepOnlyLastCommitDeletionPolicy();
+
+    final DistributedIndexServer indexServer = new DistributedIndexServer();
+    indexServer.setBlurMetrics(blurMetrics);
+    indexServer.setCache(cache);
+    indexServer.setClusterStatus(clusterStatus);
+    indexServer.setConfiguration(config);
+    indexServer.setNodeName(nodeName);
+    indexServer.setRefresher(refresher);
+    indexServer.setShardOpenerThreadCount(configuration.getInt(BLUR_SHARD_OPENER_THREAD_COUNT, 16));
+    indexServer.setZookeeper(zooKeeper);
+    indexServer.setFilterCache(filterCache);
+    indexServer.setSafeModeDelay(configuration.getLong(BLUR_SHARD_SAFEMODEDELAY, 60000));
+    indexServer.setWarmup(indexWarmup);
+    indexServer.setIndexDeletionPolicy(indexDeletionPolicy);
+    indexServer.setTimeBetweenCommits(configuration.getLong(BLUR_SHARD_TIME_BETWEEN_COMMITS, 60000));
+    indexServer.setTimeBetweenRefreshs(configuration.getLong(BLUR_SHARD_TIME_BETWEEN_REFRESHS, 500));
+    indexServer.init();
+
+    final IndexManager indexManager = new IndexManager();
+    indexManager.setIndexServer(indexServer);
+    indexManager.setMaxClauseCount(configuration.getInt(BLUR_MAX_CLAUSE_COUNT, 1024));
+    indexManager.setThreadCount(configuration.getInt(BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT, 32));
+    indexManager.setBlurMetrics(blurMetrics);
+    indexManager.setFilterCache(filterCache);
+    indexManager.init();
+
+    final BlurShardServer shardServer = new BlurShardServer();
+    shardServer.setIndexServer(indexServer);
+    shardServer.setIndexManager(indexManager);
+    shardServer.setZookeeper(zooKeeper);
+    shardServer.setClusterStatus(clusterStatus);
+    shardServer.setDataFetchThreadCount(configuration.getInt(BLUR_SHARD_DATA_FETCH_THREAD_COUNT, 8));
+    shardServer.setMaxQueryCacheElements(configuration.getInt(BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS, 128));
+    shardServer.setMaxTimeToLive(configuration.getLong(BLUR_SHARD_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1)));
+    shardServer.setQueryChecker(queryChecker);
+    shardServer.init();
+
+    Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(blurMetrics, shardServer, Iface.class);
+
+    int threadCount = configuration.getInt(BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT, 32);
+
+    final ThriftBlurShardServer server = new ThriftBlurShardServer();
+    server.setNodeName(nodeName);
+    server.setBindAddress(bindAddress);
+    server.setBindPort(bindPort);
+    server.setThreadCount(threadCount);
+    server.setIface(iface);
+    server.setConfiguration(configuration);
+
+    int baseGuiPort = Integer.parseInt(configuration.get(BLUR_GUI_SHARD_PORT));
+    final HttpJettyServer httpServer;
+    if (baseGuiPort > 0) {
+      int webServerPort = baseGuiPort + serverIndex;
+
+      // TODO: this got ugly, there has to be a better way to handle all these
+      // params
+      // without reversing the mvn dependancy and making blur-gui on top.
+      httpServer = new HttpJettyServer(bindPort, webServerPort, configuration.getInt(BLUR_CONTROLLER_BIND_PORT, -1), configuration.getInt(BLUR_SHARD_BIND_PORT, -1),
+          configuration.getInt(BLUR_GUI_CONTROLLER_PORT, -1), configuration.getInt(BLUR_GUI_SHARD_PORT, -1), "shard", blurMetrics);
+    } else {
+      httpServer = null;
+    }
+
+    // This will shutdown the server when the correct path is set in zk
+    BlurShutdown shutdown = new BlurShutdown() {
+      @Override
+      public void shutdown() {
+        ThreadWatcher threadWatcher = ThreadWatcher.instance();
+        quietClose(refresher, server, shardServer, indexManager, indexServer, threadWatcher, clusterStatus, zooKeeper, httpServer);
+      }
+    };
+    server.setShutdown(shutdown);
+    new BlurServerShutDown().register(shutdown, zooKeeper);
+    return server;
+  }
+
+  private static BlurFilterCache getFilterCache(BlurConfiguration configuration) {
+    String _blurFilterCacheClass = configuration.get(BLUR_SHARD_FILTER_CACHE_CLASS);
+    if (_blurFilterCacheClass != null) {
+      try {
+        Class<?> clazz = Class.forName(_blurFilterCacheClass);
+        return (BlurFilterCache) clazz.newInstance();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return new DefaultBlurFilterCache();
+  }
+
+  private static BlurIndexWarmup getIndexWarmup(BlurConfiguration configuration) {
+    String _blurFilterCacheClass = configuration.get(BLUR_SHARD_INDEX_WARMUP_CLASS);
+    if (_blurFilterCacheClass != null) {
+      try {
+        Class<?> clazz = Class.forName(_blurFilterCacheClass);
+        return (BlurIndexWarmup) clazz.newInstance();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return new DefaultBlurIndexWarmup();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
new file mode 100644
index 0000000..30d54e9
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
@@ -0,0 +1,160 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
+import org.apache.blur.thrift.ExecutorServicePerMethodCallThriftServer.Args;
+import org.apache.blur.thrift.generated.Blur;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TTransportException;
+
+
+public class ThriftServer {
+
+  private static final Log LOG = LogFactory.getLog(ThriftServer.class);
+
+  private String _nodeName;
+  private Iface _iface;
+  private TServer _server;
+  private boolean _closed;
+  private BlurConfiguration _configuration;
+  private int _threadCount;
+  private int _bindPort;
+  private String _bindAddress;
+  private BlurShutdown _shutdown;
+  private ExecutorService _executorService;
+  private ExecutorService _queryExexutorService;
+  private ExecutorService _mutateExecutorService;
+
+  public synchronized void close() {
+    if (!_closed) {
+      _closed = true;
+      _shutdown.shutdown();
+      _server.stop();
+      _executorService.shutdownNow();
+      _queryExexutorService.shutdownNow();
+      _mutateExecutorService.shutdownNow();
+    }
+  }
+
+  protected static int getServerIndex(String[] args) {
+    for (int i = 0; i < args.length; i++) {
+      if ("-s".equals(args[i])) {
+        if (i + 1 < args.length) {
+          return Integer.parseInt(args[i + 1]);
+        }
+      }
+    }
+    return 0;
+  }
+
+  public void start() throws TTransportException {
+    Blur.Processor<Blur.Iface> processor = new Blur.Processor<Blur.Iface>(_iface);
+    TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(getBindInetSocketAddress(_configuration));
+
+    Args args = new Args(serverTransport);
+    args.processor(processor);
+    _executorService = Executors.newThreadPool("thrift-processors", _threadCount);
+    args.executorService(_executorService);
+    Map<String, ExecutorService> methodCallsToExecutorService = new HashMap<String, ExecutorService>();
+    _mutateExecutorService = Executors.newThreadPool("thrift-processors-mutate", _threadCount);
+    methodCallsToExecutorService.put("mutate", _mutateExecutorService);
+    methodCallsToExecutorService.put("mutateBatch", _mutateExecutorService);
+    _queryExexutorService = Executors.newThreadPool("thrift-processors-query", _threadCount);
+    methodCallsToExecutorService.put("query", _queryExexutorService);
+    args.setMethodCallsToExecutorService(methodCallsToExecutorService);
+    _server = new ExecutorServicePerMethodCallThriftServer(args);
+    LOG.info("Starting server [{0}]", _nodeName);
+    _server.serve();
+  }
+
+  public InetSocketAddress getBindInetSocketAddress(BlurConfiguration configuration) {
+    return new InetSocketAddress(_bindAddress, _bindPort);
+  }
+
+  public static String isEmpty(String str, String name) {
+    if (str == null || str.trim().isEmpty()) {
+      throw new IllegalArgumentException("Property [" + name + "] is missing or blank.");
+    }
+    return str;
+  }
+
+  public Iface getIface() {
+    return _iface;
+  }
+
+  public void setIface(Iface iface) {
+    this._iface = iface;
+  }
+
+  public String getNodeName() {
+    return _nodeName;
+  }
+
+  public void setNodeName(String nodeName) {
+    this._nodeName = nodeName;
+  }
+
+  public void setConfiguration(BlurConfiguration configuration) {
+    this._configuration = configuration;
+  }
+
+  public static String getNodeName(BlurConfiguration configuration, String hostNameProperty) throws UnknownHostException {
+    String hostName = configuration.get(hostNameProperty);
+    if (hostName == null) {
+      hostName = "";
+    }
+    hostName = hostName.trim();
+    if (hostName.isEmpty()) {
+      return InetAddress.getLocalHost().getHostName();
+    }
+    return hostName;
+  }
+
+  public void setBindPort(int bindPort) {
+    _bindPort = bindPort;
+  }
+
+  public void setBindAddress(String bindAddress) {
+    _bindAddress = bindAddress;
+  }
+
+  public void setThreadCount(int threadCount) {
+    this._threadCount = threadCount;
+  }
+
+  public BlurShutdown getShutdown() {
+    return _shutdown;
+  }
+
+  public void setShutdown(BlurShutdown shutdown) {
+    this._shutdown = shutdown;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/thrift/ZookeeperSystemTime.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ZookeeperSystemTime.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ZookeeperSystemTime.java
new file mode 100644
index 0000000..98de816
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ZookeeperSystemTime.java
@@ -0,0 +1,51 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.zookeeper.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+
+public class ZookeeperSystemTime {
+  public static void main(String[] args) throws InterruptedException, KeeperException, IOException, BlurException {
+    final ZooKeeper zooKeeper = ZkUtils.newZooKeeper("localhost");
+    long tolerance = 3000;
+    checkSystemTime(zooKeeper, tolerance);
+  }
+
+  public static void checkSystemTime(ZooKeeper zooKeeper, long tolerance) throws KeeperException, InterruptedException, BlurException {
+    String path = zooKeeper.create("/" + UUID.randomUUID().toString(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+    long now = System.currentTimeMillis();
+    Stat stat = zooKeeper.exists(path, false);
+    zooKeeper.delete(path, -1);
+    long ctime = stat.getCtime();
+
+    long low = now - tolerance;
+    long high = now + tolerance;
+    if (!(low <= ctime && ctime <= high)) {
+      throw new BlurException("The system time is too far out of sync with Zookeeper, check your system time and try again.", null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
new file mode 100644
index 0000000..cd0f0bc
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -0,0 +1,119 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.manager.results.BlurResultComparator;
+import org.apache.blur.manager.results.BlurResultPeekableIteratorComparator;
+import org.apache.blur.manager.results.PeekableIterator;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.Term;
+
+
+public class BlurConstants {
+
+  public static final String CONTROLLER = "controller";
+  public static final String SHARD = "shard";
+  public static final String SHARD_PREFIX = "shard-";
+  public static final Comparator<? super PeekableIterator<BlurResult>> HITS_PEEKABLE_ITERATOR_COMPARATOR = new BlurResultPeekableIteratorComparator();
+  public static final Comparator<? super BlurResult> HITS_COMPARATOR = new BlurResultComparator();
+
+  public static final String PRIME_DOC = "_prime_";
+  public static final String PRIME_DOC_VALUE = "true";
+  public static final String ROW_ID = "rowid";
+  public static final String RECORD_ID = "recordid";
+  public static final String SUPER = "super";
+  public static final String SEP = ".";
+
+  public static final String BLUR_TABLE_PATH = "blur.table.path";
+  public static final String BLUR_ZOOKEEPER_CONNECTION = "blur.zookeeper.connection";
+  public static final String BLUR_SHARD_HOSTNAME = "blur.shard.hostname";
+  public static final String BLUR_SHARD_BIND_PORT = "blur.shard.bind.port";
+  public static final String BLUR_SHARD_BIND_ADDRESS = "blur.shard.bind.address";
+  public static final String BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION = "blur.shard.blockcache.direct.memory.allocation";
+  public static final String BLUR_SHARD_BLOCKCACHE_SLAB_COUNT = "blur.shard.blockcache.slab.count";
+  public static final String BLUR_SHARD_SAFEMODEDELAY = "blur.shard.safemodedelay";
+  public static final String BLUR_CONTROLLER_HOSTNAME = "blur.controller.hostname";
+  public static final String BLUR_CONTROLLER_BIND_PORT = "blur.controller.bind.port";
+  public static final String BLUR_CONTROLLER_BIND_ADDRESS = "blur.controller.bind.address";
+  public static final String BLUR_QUERY_MAX_ROW_FETCH = "blur.query.max.row.fetch";
+  public static final String BLUR_QUERY_MAX_RECORD_FETCH = "blur.query.max.record.fetch";
+  public static final String BLUR_QUERY_MAX_RESULTS_FETCH = "blur.query.max.results.fetch";
+
+  public static final String BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT = "blur.shard.server.thrift.thread.count";
+  public static final String BLUR_SHARD_CACHE_MAX_TIMETOLIVE = "blur.shard.cache.max.timetolive";
+  public static final String BLUR_SHARD_FILTER_CACHE_CLASS = "blur.shard.filter.cache.class";
+  public static final String BLUR_SHARD_INDEX_WARMUP_CLASS = "blur.shard.index.warmup.class";
+  public static final String BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT = "blur.indexmanager.search.thread.count";
+  public static final String BLUR_SHARD_DATA_FETCH_THREAD_COUNT = "blur.shard.data.fetch.thread.count";
+  public static final String BLUR_MAX_CLAUSE_COUNT = "blur.max.clause.count";
+  public static final String BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS = "blur.shard.cache.max.querycache.elements";
+  public static final String BLUR_SHARD_OPENER_THREAD_COUNT = "blur.shard.opener.thread.count";
+  public static final String BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE = "blur.shard.index.deletion.policy.maxage";
+  public static final String BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE = "blur.zookeeper.system.time.tolerance";
+
+  public static final String BLUR_SHARD_TIME_BETWEEN_COMMITS = "blur.shard.time.between.commits";
+  public static final String BLUR_SHARD_TIME_BETWEEN_REFRESHS = "blur.shard.time.between.refreshs";
+
+  public static final String BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT = "blur.controller.server.thrift.thread.count";
+  public static final String BLUR_CONTROLLER_SERVER_REMOTE_THREAD_COUNT = "blur.controller.server.remote.thread.count";
+  public static final String BLUR_CONTROLLER_CACHE_MAX_TIMETOLIVE = "blur.controller.cache.max.timetolive";
+  public static final String BLUR_CONTROLLER_CACHE_MAX_QUERYCACHE_ELEMENTS = "blur.controller.cache.max.querycache.elements";
+  public static final String BLUR_CONTROLLER_REMOTE_FETCH_COUNT = "blur.controller.remote.fetch.count";
+
+  public static final String BLUR_CONTROLLER_RETRY_MAX_MUTATE_RETRIES = "blur.controller.retry.max.mutate.retries";
+  public static final String BLUR_CONTROLLER_RETRY_MAX_DEFAULT_RETRIES = "blur.controller.retry.max.default.retries";
+  public static final String BLUR_CONTROLLER_RETRY_FETCH_DELAY = "blur.controller.retry.fetch.delay";
+  public static final String BLUR_CONTROLLER_RETRY_DEFAULT_DELAY = "blur.controller.retry.default.delay";
+  public static final String BLUR_CONTROLLER_RETRY_MUTATE_DELAY = "blur.controller.retry.mutate.delay";
+  public static final String BLUR_CONTROLLER_RETRY_MAX_FETCH_DELAY = "blur.controller.retry.max.fetch.delay";
+  public static final String BLUR_CONTROLLER_RETRY_MAX_MUTATE_DELAY = "blur.controller.retry.max.mutate.delay";
+  public static final String BLUR_CONTROLLER_RETRY_MAX_DEFAULT_DELAY = "blur.controller.retry.max.default.delay";
+  public static final String BLUR_CONTROLLER_RETRY_MAX_FETCH_RETRIES = "blur.controller.retry.max.fetch.retries";
+
+  public static final String BLUR_GUI_CONTROLLER_PORT = "blur.gui.controller.port";
+  public static final String BLUR_GUI_SHARD_PORT = "blur.gui.shard.port";
+
+  public static final String DEFAULT = "default";
+  public static final String BLUR_CLUSTER_NAME = "blur.cluster.name";
+  public static final String BLUR_CLUSTER;
+
+  public static final long ZK_WAIT_TIME = TimeUnit.SECONDS.toMillis(5);
+
+  public static final Term PRIME_DOC_TERM = new Term(PRIME_DOC, BlurConstants.PRIME_DOC_VALUE);
+  public static final Field PRIME_DOC_FIELD = new Field(PRIME_DOC, PRIME_DOC_VALUE, Store.YES, Index.NOT_ANALYZED_NO_NORMS);
+
+  static {
+    try {
+      BlurConfiguration configuration = new BlurConfiguration();
+      BLUR_CLUSTER = configuration.get(BLUR_CLUSTER_NAME, DEFAULT);
+    } catch (IOException e) {
+      throw new RuntimeException("Unknown error parsing configuration.", e);
+    }
+  }
+
+  public static int getPid() {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/utils/BlurExecutorCompletionService.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurExecutorCompletionService.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurExecutorCompletionService.java
new file mode 100644
index 0000000..44327da
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurExecutorCompletionService.java
@@ -0,0 +1,135 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.generated.BlurException;
+
+
+public class BlurExecutorCompletionService<T> extends ExecutorCompletionService<T> {
+
+  private AtomicInteger count = new AtomicInteger(0);
+  private Collection<Future<T>> _bag;
+  private Cancel _cancel;
+
+  public interface Cancel {
+    void cancel();
+  }
+
+  public BlurExecutorCompletionService(Executor executor, Cancel cancel) {
+    super(executor);
+    _bag = Collections.synchronizedCollection(new HashSet<Future<T>>());
+    _cancel = cancel;
+  }
+
+  public void cancelAll() {
+    for (Future<T> future : _bag) {
+      future.cancel(true);
+    }
+    _cancel.cancel();
+  }
+
+  private Future<T> remember(Future<T> future) {
+    _bag.add(future);
+    return future;
+  }
+
+  private Future<T> forget(Future<T> future) {
+    _bag.remove(future);
+    return future;
+  }
+
+  public int getRemainingCount() {
+    return count.get();
+  }
+
+  @Override
+  public Future<T> poll() {
+    Future<T> poll = super.poll();
+    if (poll != null) {
+      count.decrementAndGet();
+    }
+    return forget(poll);
+  }
+
+  @Override
+  public Future<T> poll(long timeout, TimeUnit unit) throws InterruptedException {
+    Future<T> poll = super.poll(timeout, unit);
+    if (poll != null) {
+      count.decrementAndGet();
+    }
+    return forget(poll);
+  }
+
+  @Override
+  public Future<T> submit(Callable<T> task) {
+    Future<T> future = super.submit(task);
+    count.incrementAndGet();
+    return remember(future);
+  }
+
+  @Override
+  public Future<T> submit(Runnable task, T result) {
+    Future<T> future = super.submit(task, result);
+    count.incrementAndGet();
+    return remember(future);
+  }
+
+  @Override
+  public Future<T> take() throws InterruptedException {
+    Future<T> take = super.take();
+    if (take != null) {
+      count.decrementAndGet();
+    }
+    return forget(take);
+  }
+
+  public Future<T> poll(long timeout, TimeUnit unit, boolean throwExceptionIfTimeout, Object... parameters) throws BlurException {
+    try {
+      Future<T> future = poll(timeout, unit);
+      if (future == null) {
+        throw new BException("Call timeout [{0}]", Arrays.asList(parameters));
+      }
+      return future;
+    } catch (InterruptedException e) {
+      throw new BException("Call interrupted [{0}]", e, Arrays.asList(parameters));
+    }
+  }
+
+  public T getResultThrowException(Future<T> future, Object... parameters) throws BlurException {
+    try {
+      return future.get();
+    } catch (InterruptedException e) {
+      throw new BException("Call interrupted [{0}]", e, Arrays.asList(parameters));
+    } catch (ExecutionException e) {
+      throw new BException("Call execution exception [{0}]", e.getCause(), Arrays.asList(parameters));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java
new file mode 100644
index 0000000..d8dcefa
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java
@@ -0,0 +1,46 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+
+public class BlurThriftRecord extends Record implements ReaderBlurRecord {
+
+  private static final long serialVersionUID = 1447192115360284850L;
+
+  @Override
+  public void addColumn(String name, String value) {
+    addToColumns(new Column(name, value));
+  }
+
+  @Override
+  public void setRecordIdStr(String value) {
+    setRecordId(value);
+  }
+
+  @Override
+  public void setFamilyStr(String family) {
+    setFamily(family);
+  }
+
+  @Override
+  public void setRowIdStr(String rowId) {
+    // setRowIdStr(rowId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
new file mode 100644
index 0000000..7a8096e
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -0,0 +1,762 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.SHARD_PREFIX;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
+import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.metrics.BlurMetrics;
+import org.apache.blur.metrics.BlurMetrics.MethodCall;
+import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.thrift.generated.BlurResults;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.RecordMutationType;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.RowMutationType;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.SimpleQuery;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.search.Filter;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.util.PagedBytes.PagedBytesDataInput;
+import org.apache.lucene.util.packed.PackedInts;
+import org.apache.lucene.util.packed.PackedInts.Reader;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class BlurUtil {
+
+  private static final Object[] EMPTY_OBJECT_ARRAY = new Object[] {};
+  private static final Class<?>[] EMPTY_PARAMETER_TYPES = new Class[] {};
+  private static final Log LOG = LogFactory.getLog(BlurUtil.class);
+  private static final String UNKNOWN = "UNKNOWN";
+
+  @SuppressWarnings("unchecked")
+  public static <T extends Iface> T recordMethodCallsAndAverageTimes(final BlurMetrics metrics, final T t, Class<T> clazz) {
+    InvocationHandler handler = new InvocationHandler() {
+      @Override
+      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+        long start = System.nanoTime();
+        try {
+          return method.invoke(t, args);
+        } catch (InvocationTargetException e) {
+          throw e.getTargetException();
+        } finally {
+          long end = System.nanoTime();
+          MethodCall methodCall = metrics.methodCalls.get(method.getName());
+          if (methodCall == null) {
+            methodCall = new MethodCall();
+            metrics.methodCalls.put(method.getName(), methodCall);
+          }
+          methodCall.invokes.incrementAndGet();
+          methodCall.times.addAndGet(end - start);
+        }
+      }
+    };
+    return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] { clazz }, handler);
+  }
+
+  public static void setupZookeeper(ZooKeeper zookeeper) throws KeeperException, InterruptedException {
+    setupZookeeper(zookeeper, null);
+  }
+
+  public synchronized static void setupZookeeper(ZooKeeper zookeeper, String cluster) throws KeeperException, InterruptedException {
+    BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getBasePath());
+    BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getOnlineControllersPath());
+    BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getClustersPath());
+    if (cluster != null) {
+      BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getClusterPath(cluster));
+      BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getSafemodePath(cluster));
+      BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getRegisteredShardsPath(cluster));
+      BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getOnlinePath(cluster));
+      BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getOnlineShardsPath(cluster));
+      BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getTablesPath(cluster));
+    }
+  }
+
+  public static BlurQuery newSimpleQuery(String query) {
+    BlurQuery blurQuery = new BlurQuery();
+    SimpleQuery simpleQuery = new SimpleQuery();
+    simpleQuery.setQueryStr(query);
+    blurQuery.setSimpleQuery(simpleQuery);
+    blurQuery.setSelector(new Selector());
+    return blurQuery;
+  }
+
+  public static void createIfMissing(ZooKeeper zookeeper, String path) throws KeeperException, InterruptedException {
+    if (zookeeper.exists(path, false) == null) {
+      try {
+        zookeeper.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      } catch (KeeperException e) {
+        if (e.code() == Code.NODEEXISTS) {
+          return;
+        }
+        throw e;
+      }
+    }
+  }
+
+  public static List<Long> getList(AtomicLongArray atomicLongArray) {
+    if (atomicLongArray == null) {
+      return null;
+    }
+    List<Long> counts = new ArrayList<Long>(atomicLongArray.length());
+    for (int i = 0; i < atomicLongArray.length(); i++) {
+      counts.add(atomicLongArray.get(i));
+    }
+    return counts;
+  }
+
+  public static void quietClose(Object... close) {
+    if (close == null) {
+      return;
+    }
+    for (Object object : close) {
+      if (object != null) {
+        close(object);
+      }
+    }
+  }
+
+  private static void close(Object object) {
+    Class<? extends Object> clazz = object.getClass();
+    Method method;
+    try {
+      method = clazz.getMethod("close", EMPTY_PARAMETER_TYPES);
+    } catch (SecurityException e) {
+      throw new RuntimeException(e);
+    } catch (NoSuchMethodException e) {
+      return;
+    }
+    try {
+      method.invoke(object, EMPTY_OBJECT_ARRAY);
+    } catch (Exception e) {
+      LOG.error("Error while trying to close object [{0}]", e, object);
+    }
+  }
+
+  public static Selector newSelector(String locationId) {
+    Selector selector = new Selector();
+    selector.locationId = locationId;
+    return selector;
+  }
+
+  public static RecordMutation newRecordMutation(String family, String recordId, Column... columns) {
+    return newRecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD, family, recordId, columns);
+  }
+
+  public static RecordMutation newRecordMutation(RecordMutationType type, String family, String recordId, Column... columns) {
+    Record record = new Record();
+    record.setRecordId(recordId);
+    record.setFamily(family);
+    for (Column column : columns) {
+      record.addToColumns(column);
+    }
+
+    RecordMutation mutation = new RecordMutation();
+    mutation.setRecordMutationType(type);
+    mutation.setRecord(record);
+    return mutation;
+  }
+
+  public static RecordMutation findRecordMutation(RowMutation mutation, Record record) {
+    for (RecordMutation recordMutation : mutation.recordMutations) {
+      if (match(recordMutation, record)) {
+        return recordMutation;
+      }
+    }
+    return null;
+  }
+
+  public static boolean match(RecordMutation mutation, Record record) {
+    return match(mutation.record, record);
+  }
+
+  public static boolean match(Record left, Record right) {
+    return left.recordId.equals(right.recordId) && left.family.equals(right.family);
+  }
+
+  public static RowMutation newRowMutation(String table, String rowId, RecordMutation... mutations) {
+    return newRowMutation(RowMutationType.REPLACE_ROW, table, rowId, mutations);
+  }
+
+  public static RowMutation newRowMutation(RowMutationType type, String table, String rowId, RecordMutation... mutations) {
+    RowMutation mutation = new RowMutation();
+    mutation.setRowId(rowId);
+    mutation.setTable(table);
+    mutation.setRowMutationType(type);
+    for (RecordMutation recordMutation : mutations) {
+      mutation.addToRecordMutations(recordMutation);
+    }
+    return mutation;
+  }
+
+  public static Record newRecord(String family, String recordId, Column... columns) {
+    Record record = new Record();
+    record.setRecordId(recordId);
+    record.setFamily(family);
+    record.setColumns(Arrays.asList(columns));
+    return record;
+  }
+
+  public static Row newRow(String rowId, Record... records) {
+    Row row = new Row().setId(rowId);
+    for (Record record : records) {
+      row.addToRecords(record);
+    }
+    return row;
+  }
+
+  public static Column newColumn(String name, String value) {
+    return new Column().setName(name).setValue(value);
+  }
+
+  public static byte[] toBytes(Serializable serializable) {
+    try {
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      ObjectOutputStream stream = new ObjectOutputStream(outputStream);
+      stream.writeObject(serializable);
+      stream.close();
+      return outputStream.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Serializable fromBytes(byte[] bs) {
+    ObjectInputStream stream = null;
+    try {
+      stream = new ObjectInputStream(new ByteArrayInputStream(bs));
+      return (Serializable) stream.readObject();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      if (stream != null) {
+        try {
+          stream.close();
+        } catch (IOException e) {
+          // eat
+        }
+      }
+    }
+  }
+
+  public static List<Long> toList(AtomicLongArray atomicLongArray) {
+    if (atomicLongArray == null) {
+      return null;
+    }
+    int length = atomicLongArray.length();
+    List<Long> result = new ArrayList<Long>(length);
+    for (int i = 0; i < length; i++) {
+      result.add(atomicLongArray.get(i));
+    }
+    return result;
+  }
+
+  public static AtomicLongArray getAtomicLongArraySameLengthAsList(List<?> list) {
+    if (list == null) {
+      return null;
+    }
+    return new AtomicLongArray(list.size());
+  }
+
+  public static BlurResults convertToHits(BlurResultIterable hitsIterable, BlurQuery query, AtomicLongArray facetCounts, ExecutorService executor, Selector selector,
+      final Iface iface, final String table) throws InterruptedException, ExecutionException {
+    BlurResults results = new BlurResults();
+    results.setTotalResults(hitsIterable.getTotalResults());
+    results.setShardInfo(hitsIterable.getShardInfo());
+    if (query.minimumNumberOfResults > 0) {
+      hitsIterable.skipTo(query.start);
+      int count = 0;
+      Iterator<BlurResult> iterator = hitsIterable.iterator();
+      while (iterator.hasNext() && count < query.fetch) {
+        results.addToResults(iterator.next());
+        count++;
+      }
+    }
+    if (results.results == null) {
+      results.results = new ArrayList<BlurResult>();
+    }
+    if (facetCounts != null) {
+      results.facetCounts = BlurUtil.toList(facetCounts);
+    }
+    if (selector != null) {
+      List<Future<FetchResult>> futures = new ArrayList<Future<FetchResult>>();
+      for (int i = 0; i < results.results.size(); i++) {
+        BlurResult result = results.results.get(i);
+        final Selector s = new Selector(selector);
+        s.setLocationId(result.locationId);
+        futures.add(executor.submit(new Callable<FetchResult>() {
+          @Override
+          public FetchResult call() throws Exception {
+            return iface.fetchRow(table, s);
+          }
+        }));
+      }
+      for (int i = 0; i < results.results.size(); i++) {
+        Future<FetchResult> future = futures.get(i);
+        BlurResult result = results.results.get(i);
+        result.setFetchResult(future.get());
+      }
+    }
+    results.query = query;
+    results.query.selector = selector;
+    return results;
+  }
+
+  public static Query readQuery(byte[] bs) throws BException {
+    return readObject(bs);
+  }
+
+  public static byte[] writeQuery(Query query) throws BException {
+    return writeObject(query);
+  }
+
+  public static Sort readSort(byte[] bs) throws BException {
+    return readObject(bs);
+  }
+
+  public static byte[] writeSort(Sort sort) throws BException {
+    return writeObject(sort);
+  }
+
+  public static Filter readFilter(byte[] bs) throws BException {
+    return readObject(bs);
+  }
+
+  public static byte[] writeFilter(Filter filter) throws BException {
+    return writeObject(filter);
+  }
+
+  private static byte[] writeObject(Serializable o) throws BException {
+    if (o == null) {
+      return null;
+    }
+    try {
+      ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+      ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream);
+      outputStream.writeObject(o);
+      outputStream.close();
+      return byteArrayOutputStream.toByteArray();
+    } catch (IOException e) {
+      throw new BException("Unknown error", e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> T readObject(byte[] bs) throws BException {
+    if (bs == null) {
+      return null;
+    }
+    ObjectInputStream inputStream = null;
+    try {
+      inputStream = new ObjectInputStream(new ByteArrayInputStream(bs));
+      return (T) inputStream.readObject();
+    } catch (IOException e) {
+      throw new BException("Unknown error", e);
+    } catch (ClassNotFoundException e) {
+      throw new BException("Unknown error", e);
+    } finally {
+      if (inputStream != null) {
+        try {
+          inputStream.close();
+        } catch (IOException e) {
+          throw new BException("Unknown error", e);
+        }
+      }
+    }
+  }
+
+  public static void setStartTime(BlurQuery query) {
+    if (query.startTime == 0) {
+      query.startTime = System.currentTimeMillis();
+    }
+  }
+
+  public static String getVersion() {
+    String path = "/META-INF/maven/com.nearinfinity.blur/blur-core/pom.properties";
+    InputStream inputStream = BlurUtil.class.getResourceAsStream(path);
+    if (inputStream == null) {
+      return UNKNOWN;
+    }
+    Properties prop = new Properties();
+    try {
+      prop.load(inputStream);
+    } catch (IOException e) {
+      LOG.error("Unknown error while getting version.", e);
+      return UNKNOWN;
+    }
+    Object verison = prop.get("version");
+    if (verison == null) {
+      return UNKNOWN;
+    }
+    return verison.toString();
+  }
+
+  public static void unlockForSafeMode(ZooKeeper zookeeper, String lockPath) throws InterruptedException, KeeperException {
+    zookeeper.delete(lockPath, -1);
+    LOG.info("Lock released.");
+  }
+
+  public static String lockForSafeMode(ZooKeeper zookeeper, String nodeName, String cluster) throws KeeperException, InterruptedException {
+    LOG.info("Getting safe mode lock.");
+    final Object lock = new Object();
+    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
+    String newPath = zookeeper.create(blurSafemodePath + "/safemode-", nodeName.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        synchronized (lock) {
+          lock.notifyAll();
+        }
+      }
+    };
+    while (true) {
+      synchronized (lock) {
+        List<String> children = new ArrayList<String>(zookeeper.getChildren(blurSafemodePath, watcher));
+        Collections.sort(children);
+        if (newPath.equals(blurSafemodePath + "/" + children.get(0))) {
+          LOG.info("Lock aquired.");
+          return newPath;
+        } else {
+          lock.wait(BlurConstants.ZK_WAIT_TIME);
+        }
+      }
+    }
+  }
+
+  public static String getShardName(String prefix, int id) {
+    return prefix + buffer(id, 8);
+  }
+
+  private static String buffer(int value, int length) {
+    String str = Integer.toString(value);
+    while (str.length() < length) {
+      str = "0" + str;
+    }
+    return str;
+  }
+
+  public static String humanizeTime(long time, TimeUnit unit) {
+    long seconds = unit.toSeconds(time);
+    long hours = getHours(seconds);
+    seconds = seconds - TimeUnit.HOURS.toSeconds(hours);
+    long minutes = getMinutes(seconds);
+    seconds = seconds - TimeUnit.MINUTES.toSeconds(minutes);
+    return humanizeTime(hours, minutes, seconds);
+  }
+
+  public static String humanizeTime(long hours, long minutes, long seconds) {
+    StringBuilder builder = new StringBuilder();
+    if (hours == 0 && minutes != 0) {
+      addMinutes(builder, minutes);
+    } else if (hours != 0) {
+      addHours(builder, hours);
+      addMinutes(builder, minutes);
+    }
+    addSeconds(builder, seconds);
+    return builder.toString().trim();
+  }
+
+  private static void addHours(StringBuilder builder, long hours) {
+    builder.append(hours).append(" hours ");
+  }
+
+  private static void addMinutes(StringBuilder builder, long minutes) {
+    builder.append(minutes).append(" minutes ");
+  }
+
+  private static void addSeconds(StringBuilder builder, long seconds) {
+    builder.append(seconds).append(" seconds ");
+  }
+
+  private static long getMinutes(long seconds) {
+    return seconds / TimeUnit.MINUTES.toSeconds(1);
+  }
+
+  private static long getHours(long seconds) {
+    return seconds / TimeUnit.HOURS.toSeconds(1);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static long getMemoryUsage(IndexReader r) {
+    try {
+      if (r instanceof SegmentReader) {
+        long size = 0;
+        SegmentReader segmentReader = (SegmentReader) r;
+        Object segmentCoreReaders = getSegmentCoreReaders(segmentReader);
+        Object termInfosReader = getTermInfosReader(segmentCoreReaders);
+        Object termInfosReaderIndex = getTermInfosReaderIndex(termInfosReader);
+        PagedBytesDataInput dataInput = getDataInput(termInfosReaderIndex);
+        PackedInts.Reader indexToDataOffset = getIndexToDataOffset(termInfosReaderIndex);
+
+        Object pagedBytes = BlurUtil.getField("this$0", dataInput);
+        List<byte[]> blocks = (List<byte[]>) BlurUtil.getField("blocks", pagedBytes);
+        for (byte[] block : blocks) {
+          size += block.length;
+        }
+
+        try {
+          Class<? extends Reader> clazz = indexToDataOffset.getClass();
+          Method method = clazz.getMethod("ramBytesUsed", EMPTY_PARAMETER_TYPES);
+          method.setAccessible(true);
+          Long ramBytesUsed = (Long) method.invoke(indexToDataOffset, EMPTY_OBJECT_ARRAY);
+          size += ramBytesUsed;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+        return size;
+      }
+      IndexReader[] readers = r.getSequentialSubReaders();
+      long total = 0;
+      if (readers != null) {
+        for (IndexReader reader : readers) {
+          total += getMemoryUsage(reader);
+        }
+      }
+      return total;
+    } catch (Exception e) {
+      LOG.error("Unknown error during getMemoryUsage call", e);
+      return 0;
+    }
+  }
+
+  private static PackedInts.Reader getIndexToDataOffset(Object termInfosReaderIndex) {
+    return (Reader) getField("indexToDataOffset", termInfosReaderIndex);
+  }
+
+  private static PagedBytesDataInput getDataInput(Object termInfosReaderIndex) {
+    return (PagedBytesDataInput) getField("dataInput", termInfosReaderIndex);
+  }
+
+  private static Object getTermInfosReaderIndex(Object termInfosReader) {
+    return getField("index", termInfosReader);
+  }
+
+  private static Object getTermInfosReader(Object segmentCoreReaders) {
+    return getField("tis", segmentCoreReaders);
+  }
+
+  private static Object getSegmentCoreReaders(SegmentReader segmentReader) {
+    return getField("core", segmentReader, SegmentReader.class);
+  }
+
+  private static Object getField(String name, Object o) {
+    Class<? extends Object> clazz = o.getClass();
+    return getField(name, o, clazz);
+  }
+
+  private static Object getField(String name, Object o, Class<? extends Object> clazz) {
+    try {
+      Field field = clazz.getDeclaredField(name);
+      field.setAccessible(true);
+      return field.get(o);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void createPath(ZooKeeper zookeeper, String path, byte[] data) throws KeeperException, InterruptedException {
+    zookeeper.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+  }
+
+  public static void setupFileSystem(String uri, int shardCount) throws IOException {
+    Path tablePath = new Path(uri);
+    FileSystem fileSystem = FileSystem.get(tablePath.toUri(), new Configuration());
+    if (createPath(fileSystem, tablePath)) {
+      LOG.info("Table uri existed.");
+      validateShardCount(shardCount, fileSystem, tablePath);
+    }
+    for (int i = 0; i < shardCount; i++) {
+      String shardName = BlurUtil.getShardName(SHARD_PREFIX, i);
+      Path shardPath = new Path(tablePath, shardName);
+      createPath(fileSystem, shardPath);
+    }
+  }
+
+  public static void validateShardCount(int shardCount, FileSystem fileSystem, Path tablePath) throws IOException {
+    FileStatus[] listStatus = fileSystem.listStatus(tablePath);
+    if (listStatus.length != shardCount) {
+      LOG.error("Number of directories in table path [" + tablePath + "] does not match definition of [" + shardCount + "] shard count.");
+      throw new RuntimeException("Number of directories in table path [" + tablePath + "] does not match definition of [" + shardCount + "] shard count.");
+    }
+  }
+
+  public static boolean createPath(FileSystem fileSystem, Path path) throws IOException {
+    if (!fileSystem.exists(path)) {
+      LOG.info("Path [{0}] does not exist, creating.", path);
+      fileSystem.mkdirs(path);
+      return false;
+    }
+    return true;
+  }
+
+  public static int zeroCheck(int i, String message) {
+    if (i < 1) {
+      throw new RuntimeException(message);
+    }
+    return i;
+  }
+
+  public static <T> T nullCheck(T t, String message) {
+    if (t == null) {
+      throw new NullPointerException(message);
+    }
+    return t;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> T getInstance(String className, Class<T> c) {
+    Class<?> clazz;
+    try {
+      clazz = Class.forName(className);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+    try {
+      return (T) configure(clazz.newInstance());
+    } catch (InstantiationException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static <T> T configure(T t) {
+    if (t instanceof Configurable) {
+      Configurable configurable = (Configurable) t;
+      configurable.setConf(new Configuration());
+    }
+    return t;
+  }
+
+  public static byte[] read(TBase<?, ?> base) {
+    if (base == null) {
+      return null;
+    }
+    TMemoryBuffer trans = new TMemoryBuffer(1024);
+    TJSONProtocol protocol = new TJSONProtocol(trans);
+    try {
+      base.write(protocol);
+    } catch (TException e) {
+      throw new RuntimeException(e);
+    }
+    trans.close();
+    byte[] buf = new byte[trans.length()];
+    System.arraycopy(trans.getArray(), 0, buf, 0, trans.length());
+    return buf;
+  }
+
+  public static void write(byte[] data, TBase<?, ?> base) {
+    nullCheck(null, "Data cannot be null.");
+    TMemoryBuffer trans = new TMemoryBuffer(1024);
+    TJSONProtocol protocol = new TJSONProtocol(trans);
+    try {
+      trans.write(data);
+      base.read(protocol);
+    } catch (TException e) {
+      throw new RuntimeException(e);
+    }
+    trans.close();
+  }
+
+  public static void removeAll(ZooKeeper zooKeeper, String path) throws KeeperException, InterruptedException {
+    List<String> list = zooKeeper.getChildren(path, false);
+    for (String p : list) {
+      removeAll(zooKeeper, path + "/" + p);
+    }
+    LOG.info("Removing path [{0}]", path);
+    zooKeeper.delete(path, -1);
+  }
+
+  public static void removeIndexFiles(String uri) throws IOException {
+    Path tablePath = new Path(uri);
+    FileSystem fileSystem = FileSystem.get(tablePath.toUri(), new Configuration());
+    fileSystem.delete(tablePath, true);
+  }
+
+  public static RowMutation toRowMutation(String table, Row row) {
+    RowMutation rowMutation = new RowMutation();
+    rowMutation.setRowId(row.getId());
+    rowMutation.setTable(table);
+    rowMutation.setRowMutationType(RowMutationType.REPLACE_ROW);
+    List<Record> records = row.getRecords();
+    for (Record record : records) {
+      rowMutation.addToRecordMutations(toRecordMutation(record));
+    }
+    return rowMutation;
+  }
+
+  public static RecordMutation toRecordMutation(Record record) {
+    RecordMutation recordMutation = new RecordMutation();
+    recordMutation.setRecord(record);
+    recordMutation.setRecordMutationType(RecordMutationType.REPLACE_ENTIRE_RECORD);
+    return recordMutation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/utils/Converter.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/Converter.java b/src/blur-core/src/main/java/org/apache/blur/utils/Converter.java
new file mode 100644
index 0000000..cad0f45
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/Converter.java
@@ -0,0 +1,21 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public interface Converter<F, T> {
+  T convert(F from) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/utils/ForkJoin.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/ForkJoin.java b/src/blur-core/src/main/java/org/apache/blur/utils/ForkJoin.java
new file mode 100644
index 0000000..2ae88cc
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/ForkJoin.java
@@ -0,0 +1,92 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.utils.BlurExecutorCompletionService.Cancel;
+
+
+public class ForkJoin {
+
+  private static Log LOG = LogFactory.getLog(ForkJoin.class);
+
+  public static interface ParallelCall<INPUT, OUTPUT> {
+    OUTPUT call(INPUT input) throws Exception;
+  }
+
+  public static interface ParallelReturn<OUTPUT> {
+    OUTPUT merge(Merger<OUTPUT> merger) throws BlurException;
+  }
+
+  public static interface Merger<OUTPUT> {
+    OUTPUT merge(BlurExecutorCompletionService<OUTPUT> service) throws BlurException;
+  }
+
+  public static Cancel CANCEL = new Cancel() {
+    @Override
+    public void cancel() {
+      // do nothing
+    }
+  };
+
+  public static <INPUT, OUTPUT> ParallelReturn<OUTPUT> execute(ExecutorService executor, Iterable<INPUT> it, final ParallelCall<INPUT, OUTPUT> parallelCall) {
+    return execute(executor, it, parallelCall, CANCEL);
+  }
+
+  public static <INPUT, OUTPUT> ParallelReturn<OUTPUT> execute(ExecutorService executor, Iterable<INPUT> it, final ParallelCall<INPUT, OUTPUT> parallelCall, Cancel cancel) {
+    final BlurExecutorCompletionService<OUTPUT> service = new BlurExecutorCompletionService<OUTPUT>(executor, cancel);
+    for (final INPUT input : it) {
+      service.submit(new Callable<OUTPUT>() {
+        @Override
+        public OUTPUT call() throws Exception {
+          return parallelCall.call(input);
+        }
+      });
+    }
+    return new ParallelReturn<OUTPUT>() {
+      @Override
+      public OUTPUT merge(Merger<OUTPUT> merger) throws BlurException {
+        boolean exception = true;
+        try {
+          OUTPUT merge = merger.merge(service);
+          exception = false;
+          return merge;
+        } finally {
+          if (exception) {
+            service.cancelAll();
+          }
+        }
+      }
+    };
+  }
+
+  public static <T> T ignoreExecutionException(Future<T> future, T defaultValue) throws InterruptedException {
+    try {
+      return future.get();
+    } catch (ExecutionException e) {
+      LOG.error("Error while trying to execute task [{0}]", e, e.getMessage());
+      return defaultValue;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/utils/IterableConverter.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/IterableConverter.java b/src/blur-core/src/main/java/org/apache/blur/utils/IterableConverter.java
new file mode 100644
index 0000000..c6f22f1
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/IterableConverter.java
@@ -0,0 +1,36 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Iterator;
+
+public class IterableConverter<F, T> implements Iterable<T> {
+
+  private Converter<F, T> converter;
+  private Iterable<F> iterable;
+
+  public IterableConverter(Iterable<F> iterable, Converter<F, T> converter) {
+    this.converter = converter;
+    this.iterable = iterable;
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    return new IteratorConverter<F, T>(iterable.iterator(), converter);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/utils/IteratorConverter.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/IteratorConverter.java b/src/blur-core/src/main/java/org/apache/blur/utils/IteratorConverter.java
new file mode 100644
index 0000000..24d769e
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/IteratorConverter.java
@@ -0,0 +1,50 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Iterator;
+
+public class IteratorConverter<F, T> implements Iterator<T> {
+
+  private Converter<F, T> converter;
+  private Iterator<F> iterator;
+
+  public IteratorConverter(Iterator<F> iterator, Converter<F, T> converter) {
+    this.converter = converter;
+    this.iterator = iterator;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  @Override
+  public T next() {
+    try {
+      return converter.convert(iterator.next());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void remove() {
+    iterator.remove();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/utils/ObjectSize.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/ObjectSize.java b/src/blur-core/src/main/java/org/apache/blur/utils/ObjectSize.java
new file mode 100644
index 0000000..2c9a2c9
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/ObjectSize.java
@@ -0,0 +1,37 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.lang.instrument.Instrumentation;
+
+public class ObjectSize {
+
+  private static Instrumentation instrumentation;
+
+  public static void premain(String agentArgs, Instrumentation inst) {
+    instrumentation = inst;
+  }
+
+  public static long getSizeInBytes(Object o) {
+    if (instrumentation == null)
+      return -1;
+    if (o == null)
+      return 0;
+    return instrumentation.getObjectSize(o);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/utils/PrimeDocCache.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/PrimeDocCache.java b/src/blur-core/src/main/java/org/apache/blur/utils/PrimeDocCache.java
new file mode 100644
index 0000000..4ca0e7d
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/PrimeDocCache.java
@@ -0,0 +1,68 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexReader.ReaderClosedListener;
+import org.apache.lucene.index.TermDocs;
+import org.apache.lucene.util.OpenBitSet;
+
+
+public class PrimeDocCache {
+
+  private static final Log LOG = LogFactory.getLog(PrimeDocCache.class);
+
+  public static final OpenBitSet EMPTY_BIT_SET = new OpenBitSet();
+
+  private static Map<Object, OpenBitSet> primeDocMap = new ConcurrentHashMap<Object, OpenBitSet>();
+
+  /**
+   * The way this method is called via warm up methods the likelihood of
+   * creating multiple bitsets during a race condition is very low, that's why
+   * this method is not synced.
+   */
+  public static OpenBitSet getPrimeDocBitSet(IndexReader reader) throws IOException {
+    Object key = reader.getCoreCacheKey();
+    OpenBitSet bitSet = primeDocMap.get(key);
+    if (bitSet == null) {
+      reader.addReaderClosedListener(new ReaderClosedListener() {
+        @Override
+        public void onClose(IndexReader reader) {
+          Object key = reader.getCoreCacheKey();
+          LOG.debug("Current size [" + primeDocMap.size() + "] Prime Doc BitSet removing for segment [" + reader + "]");
+          primeDocMap.remove(key);
+        }
+      });
+      LOG.debug("Prime Doc BitSet missing for segment [" + reader + "] current size [" + primeDocMap.size() + "]");
+      bitSet = new OpenBitSet(reader.maxDoc());
+      primeDocMap.put(key, bitSet);
+      TermDocs termDocs = reader.termDocs(BlurConstants.PRIME_DOC_TERM);
+      while (termDocs.next()) {
+        bitSet.set(termDocs.doc());
+      }
+      termDocs.close();
+    }
+    return bitSet;
+  }
+
+}


Mime
View raw message