incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [36/45] git commit: Fixed BLUR-222.
Date Sat, 28 Sep 2013 02:12:22 GMT
Fixed BLUR-222.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/1cdb7fc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/1cdb7fc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/1cdb7fc7

Branch: refs/heads/apache-blur-0.2
Commit: 1cdb7fc73149d632231e3a2962a8a23c96087f78
Parents: c022cd5
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Sep 21 20:06:44 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Sep 21 20:06:44 2013 -0400

----------------------------------------------------------------------
 .../blur/manager/writer/BlurIndexReader.java    |   1 -
 .../manager/writer/SharedMergeScheduler.java    |   8 +-
 .../blur/thrift/ThriftBlurControllerServer.java |  22 +-
 .../blur/thrift/ThriftBlurShardServer.java      |  48 +++-
 .../org/apache/blur/thrift/ThriftServer.java    |  51 ++--
 .../test/java/org/apache/blur/MiniCluster.java  | 242 +++++++++++--------
 .../ZookeeperClusterStatusTest.java             |  90 +++----
 .../blur/manager/indexserver/SafeModeTest.java  |  29 ++-
 .../manager/writer/BlurIndexReaderTest.java     |   1 +
 .../manager/writer/TransactionRecorderTest.java |  38 +--
 .../org/apache/blur/thrift/BlurClusterTest.java |  64 +++--
 .../transport/TNonblockingServerSocket.java     |  20 +-
 12 files changed, 379 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1cdb7fc7/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
index 6017e0a..dd87fdb 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
@@ -30,7 +30,6 @@ import org.apache.blur.lucene.warmup.TraceableDirectory;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.BException;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.index.BlurIndexWriter;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1cdb7fc7/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java b/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
index 87c31a6..1d99907 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
@@ -38,13 +38,13 @@ public class SharedMergeScheduler implements Runnable, Closeable {
 
   private BlockingQueue<IndexWriter> _writers = new LinkedBlockingQueue<IndexWriter>();
   private AtomicBoolean _running = new AtomicBoolean(true);
-  private ExecutorService service;
+  private ExecutorService _service;
 
   public SharedMergeScheduler() {
     int threads = 3;
-    service = Executors.newThreadPool("sharedMergeScheduler", threads, false);
+    _service = Executors.newThreadPool("sharedMergeScheduler", threads, false);
     for (int i = 0; i < threads; i++) {
-      service.submit(this);
+      _service.submit(this);
     }
   }
 
@@ -84,7 +84,7 @@ public class SharedMergeScheduler implements Runnable, Closeable {
   @Override
   public void close() throws IOException {
     _running.set(false);
-    service.shutdownNow();
+    _service.shutdownNow();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1cdb7fc7/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
index 8017bf7..4669bc8 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -53,8 +53,8 @@ import org.apache.blur.manager.indexserver.BlurServerShutDown;
 import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
 import org.apache.blur.metrics.ReporterSetup;
 import org.apache.blur.server.ControllerServerEventHandler;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerSocket;
 import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.MemoryReporter;
 import org.apache.blur.zookeeper.ZkUtils;
@@ -73,17 +73,24 @@ public class ThriftBlurControllerServer extends ThriftServer {
     ReporterSetup.setupReporters(configuration);
     MemoryReporter.enable();
     setupJvmMetrics();
-    ThriftServer server = createServer(serverIndex, configuration);
+    ThriftServer server = createServer(serverIndex, configuration, false);
     server.start();
   }
 
-  public static ThriftServer createServer(int serverIndex, BlurConfiguration configuration) throws Exception {
+  public static ThriftServer createServer(int serverIndex, BlurConfiguration configuration, boolean randomPort) throws Exception {
     Thread.setDefaultUncaughtExceptionHandler(new SimpleUncaughtExceptionHandler());
     String bindAddress = configuration.get(BLUR_CONTROLLER_BIND_ADDRESS);
     int bindPort = configuration.getInt(BLUR_CONTROLLER_BIND_PORT, -1);
     bindPort += serverIndex;
+    if (randomPort) {
+      bindPort = 0;
+    }
+    TNonblockingServerSocket tNonblockingServerSocket = ThriftServer.getTNonblockingServerSocket(bindAddress, bindPort);
+    if (randomPort) {
+      bindPort = tNonblockingServerSocket.getServerSocket().getLocalPort();
+    }
 
-    LOG.info("Shard Server using index [{0}] bind address [{1}]", serverIndex, bindAddress + ":" + bindPort);
+    LOG.info("Controller Server using index [{0}] bind address [{1}] random port assignment [{2}]", serverIndex, bindAddress + ":" + bindPort, randomPort);
 
     String nodeName = ThriftBlurShardServer.getNodeName(configuration, BLUR_CONTROLLER_HOSTNAME);
     nodeName = nodeName + ":" + bindPort;
@@ -95,8 +102,7 @@ public class ThriftBlurControllerServer extends ThriftServer {
 
     final ZooKeeper zooKeeper = ZkUtils.newZooKeeper(zkConnectionStr, sessionTimeout);
 
-    //@TODO this is confusing because controllers are in a cluster by default, but they see all the shards clusters.
-    BlurUtil.setupZookeeper(zooKeeper, BlurConstants.BLUR_CLUSTER);
+    BlurUtil.setupZookeeper(zooKeeper, null);
 
     final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(zooKeeper, configuration);
 
@@ -132,9 +138,7 @@ public class ThriftBlurControllerServer extends ThriftServer {
 
     final ThriftBlurControllerServer server = new ThriftBlurControllerServer();
     server.setNodeName(nodeName);
-    server.setConfiguration(configuration);
-    server.setBindAddress(bindAddress);
-    server.setBindPort(bindPort);
+    server.setServerTransport(tNonblockingServerSocket);
     server.setThreadCount(threadCount);
     server.setEventHandler(eventHandler);
     server.setIface(iface);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1cdb7fc7/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index e1804d5..1807a70 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -16,7 +16,33 @@ package org.apache.blur.thrift;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.apache.blur.utils.BlurConstants.*;
+import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER;
+import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER_NAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_MAX_CLAUSE_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_MAX_HEAP_PER_ROW_FETCH;
+import static org.apache.blur.utils.BlurConstants.BLUR_MAX_RECORDS_PER_ROW_FETCH_REQUEST;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_ADDRESS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_SLAB_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_EXPERIMENTAL_BLOCK_CACHE;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_FETCHCOUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_FILTER_CACHE_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_HOSTNAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_WARMUP_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_WARMUP_THROTTLE;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INTERNAL_SEARCH_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_OPENER_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SAFEMODEDELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_WARMUP_THREAD_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT_DEFAULT;
 import static org.apache.blur.utils.BlurUtil.quietClose;
 
 import java.util.concurrent.TimeUnit;
@@ -52,6 +78,7 @@ import org.apache.blur.store.blockcache.Cache;
 import org.apache.blur.store.buffer.BufferStore;
 import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TJSONProtocol;
 import org.apache.blur.thirdparty.thrift_0_9_0.server.TServlet;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerSocket;
 import org.apache.blur.thrift.generated.Blur;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.utils.BlurUtil;
@@ -82,16 +109,24 @@ public class ThriftBlurShardServer extends ThriftServer {
     setupJvmMetrics();
     // make this configurable
     GCWatcher.init(0.75);
-    ThriftServer server = createServer(serverIndex, configuration);
+    ThriftServer server = createServer(serverIndex, configuration, false);
     server.start();
   }
 
-  public static ThriftServer createServer(int serverIndex, BlurConfiguration configuration) throws Exception {
+  public static ThriftServer createServer(int serverIndex, BlurConfiguration configuration, boolean randomPort)
+      throws Exception {
     Configuration config = new Configuration();
 
     String bindAddress = configuration.get(BLUR_SHARD_BIND_ADDRESS);
     int bindPort = configuration.getInt(BLUR_SHARD_BIND_PORT, -1);
     bindPort += serverIndex;
+    if (randomPort) {
+      bindPort = 0;
+    }
+    TNonblockingServerSocket tNonblockingServerSocket = ThriftServer.getTNonblockingServerSocket(bindAddress, bindPort);
+    if (randomPort) {
+      bindPort = tNonblockingServerSocket.getServerSocket().getLocalPort();
+    }
 
     int baseGuiPort = Integer.parseInt(configuration.get(BLUR_GUI_SHARD_PORT));
     final HttpJettyServer httpServer;
@@ -156,7 +191,8 @@ public class ThriftBlurShardServer extends ThriftServer {
       blockCacheDirectoryFactory = new BlockCacheDirectoryFactoryV2(configuration, totalNumberOfBytes);
     }
 
-    LOG.info("Shard Server using index [{0}] bind address [{1}]", serverIndex, bindAddress + ":" + bindPort);
+    LOG.info("Shard Server using index [{0}] bind address [{1}] random port assignment [{2}]", serverIndex, bindAddress
+        + ":" + bindPort, randomPort);
 
     String nodeNameHostName = getNodeName(configuration, BLUR_SHARD_HOSTNAME);
     String nodeName = nodeNameHostName + ":" + bindPort;
@@ -230,11 +266,9 @@ public class ThriftBlurShardServer extends ThriftServer {
 
     final ThriftBlurShardServer server = new ThriftBlurShardServer();
     server.setNodeName(nodeName);
-    server.setBindAddress(bindAddress);
-    server.setBindPort(bindPort);
+    server.setServerTransport(tNonblockingServerSocket);
     server.setThreadCount(threadCount);
     server.setIface(iface);
-    server.setConfiguration(configuration);
     server.setEventHandler(eventHandler);
 
     // This will shutdown the server when the correct path is set in zk

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1cdb7fc7/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
index 12b58cb..130fd86 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
@@ -34,6 +34,7 @@ import java.lang.management.OperatingSystemMXBean;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.UnknownHostException;
 import java.util.concurrent.ExecutorService;
 
@@ -64,15 +65,21 @@ public class ThriftServer {
   private Iface _iface;
   private TServer _server;
   private boolean _closed;
-  private BlurConfiguration _configuration;
   private int _threadCount;
-  private int _bindPort;
-  private String _bindAddress;
   private BlurShutdown _shutdown;
   private ExecutorService _executorService;
   private ExecutorService _queryExexutorService;
   private ExecutorService _mutateExecutorService;
   private TServerEventHandler _eventHandler;
+  private TNonblockingServerSocket _serverTransport;
+
+  public TNonblockingServerSocket getServerTransport() {
+    return _serverTransport;
+  }
+
+  public void setServerTransport(TNonblockingServerSocket serverTransport) {
+    _serverTransport = serverTransport;
+  }
 
   public static void printUlimits() throws IOException {
     ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", "ulimit -a");
@@ -181,9 +188,7 @@ public class ThriftServer {
   public void start() throws TTransportException {
     _executorService = Executors.newThreadPool("thrift-processors", _threadCount);
     Blur.Processor<Blur.Iface> processor = new Blur.Processor<Blur.Iface>(_iface);
-
-    TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(getBindInetSocketAddress(_configuration));
-    TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(serverTransport);
+    TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(_serverTransport);
     args.processor(processor);
     args.executorService(_executorService);
     args.transportFactory(new TFramedTransport.Factory());
@@ -194,8 +199,22 @@ public class ThriftServer {
     _server.serve();
   }
 
-  public InetSocketAddress getBindInetSocketAddress(BlurConfiguration configuration) {
-    return new InetSocketAddress(_bindAddress, _bindPort);
+  public static TNonblockingServerSocket getTNonblockingServerSocket(String bindAddress, int bindPort)
+      throws TTransportException {
+    InetSocketAddress bindInetSocketAddress = getBindInetSocketAddress(bindAddress, bindPort);
+    return new TNonblockingServerSocket(bindInetSocketAddress);
+  }
+
+  public int getLocalPort() {
+    ServerSocket serverSocket = _serverTransport.getServerSocket();
+    if (serverSocket == null) {
+      return 0;
+    }
+    return serverSocket.getLocalPort();
+  }
+
+  public static InetSocketAddress getBindInetSocketAddress(String bindAddress, int bindPort) {
+    return new InetSocketAddress(bindAddress, bindPort);
   }
 
   public static String isEmpty(String str, String name) {
@@ -221,10 +240,6 @@ public class ThriftServer {
     this._nodeName = nodeName;
   }
 
-  public void setConfiguration(BlurConfiguration configuration) {
-    this._configuration = configuration;
-  }
-
   public static String getNodeName(BlurConfiguration configuration, String hostNameProperty)
       throws UnknownHostException {
     String hostName = configuration.get(hostNameProperty);
@@ -249,16 +264,8 @@ public class ThriftServer {
     return hostName;
   }
 
-  public void setBindPort(int bindPort) {
-    _bindPort = bindPort;
-  }
-
-  public void setBindAddress(String bindAddress) {
-    _bindAddress = bindAddress;
-  }
-
   public void setThreadCount(int threadCount) {
-    this._threadCount = threadCount;
+    _threadCount = threadCount;
   }
 
   public BlurShutdown getShutdown() {
@@ -266,7 +273,7 @@ public class ThriftServer {
   }
 
   public void setShutdown(BlurShutdown shutdown) {
-    this._shutdown = shutdown;
+    _shutdown = shutdown;
   }
 
   public TServerEventHandler getEventHandler() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1cdb7fc7/blur-core/src/test/java/org/apache/blur/MiniCluster.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/MiniCluster.java b/blur-core/src/test/java/org/apache/blur/MiniCluster.java
index c341ad7..2f34119 100644
--- a/blur-core/src/test/java/org/apache/blur/MiniCluster.java
+++ b/blur-core/src/test/java/org/apache/blur/MiniCluster.java
@@ -17,10 +17,8 @@ package org.apache.blur;
  * limitations under the License.
  */
 
-import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_SLAB_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_HOSTNAME;
@@ -30,6 +28,7 @@ import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
@@ -48,12 +47,9 @@ import org.apache.blur.store.buffer.BufferStore;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
 import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.BlurClientManager;
-import org.apache.blur.thrift.Connection;
 import org.apache.blur.thrift.ThriftBlurControllerServer;
 import org.apache.blur.thrift.ThriftBlurShardServer;
 import org.apache.blur.thrift.ThriftServer;
-import org.apache.blur.thrift.generated.Blur.Client;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurQuery;
@@ -73,39 +69,33 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 
-public abstract class MiniCluster {
+public class MiniCluster {
 
   private static Log LOG = LogFactory.getLog(MiniCluster.class);
-  private static MiniDFSCluster cluster;
-  private static Thread serverThread;
-  private static String zkConnectionString = "localhost:21810";
-  private static ZooKeeperServerMainEmbedded zooKeeperServerMain;
-  private static List<ThriftServer> controllers = new ArrayList<ThriftServer>();
-  private static List<ThriftServer> shards = new ArrayList<ThriftServer>();
-  private static String controllerConnectionStr;
+  private MiniDFSCluster cluster;
+  private Thread serverThread;
+  // private String zkConnectionString = "localhost:21810";
+  private ZooKeeperServerMainEmbedded zooKeeperServerMain;
+  private List<ThriftServer> controllers = new ArrayList<ThriftServer>();
+  private List<ThriftServer> shards = new ArrayList<ThriftServer>();
 
   public static void main(String[] args) throws IOException, InterruptedException, KeeperException, BlurException,
       TException {
-    startDfs("./tmp/hdfs");
-    startZooKeeper("./tmp/zk");
-    startControllers(1);
-    startShards(1);
-
-    // Run the controllers/shards on custom ports.
-    // BlurConfiguration conf = new BlurConfiguration(false);
-    // conf.setInt(BLUR_CONTROLLER_BIND_PORT, 40001);
-    // conf.setInt(BLUR_SHARD_BIND_PORT, 40002);
-    // startControllers(conf, 1);
-    // startShards(conf, 1);
+    MiniCluster miniCluster = new MiniCluster();
+    miniCluster.startDfs("./tmp/hdfs");
+    miniCluster.startZooKeeper("./tmp/zk");
+    miniCluster.startControllers(1, false);
+    miniCluster.startShards(1, false);
 
     try {
-      Iface client = BlurClient.getClient(getControllerConnectionStr());
-      createTable("test", client);
+      Iface client = BlurClient.getClient(miniCluster.getControllerConnectionStr());
+      miniCluster.createTable("test", client);
       long start = System.nanoTime();
       for (int i = 0; i < 1000; i++) {
         long now = System.nanoTime();
@@ -113,44 +103,48 @@ public abstract class MiniCluster {
           System.out.println("Total [" + i + "]");
           start = now;
         }
-        addRow("test", i, client);
+        miniCluster.addRow("test", i, client);
       }
 
       // This waits for all the data to become visible.
       Thread.sleep(2000);
 
       for (int i = 0; i < 1000; i++) {
-        searchRow("test", i, client);
+        miniCluster.searchRow("test", i, client);
       }
 
     } finally {
-      stopShards();
-      stopControllers();
-      shutdownZooKeeper();
-      shutdownDfs();
+      miniCluster.stopShards();
+      miniCluster.stopControllers();
+      miniCluster.shutdownZooKeeper();
+      miniCluster.shutdownDfs();
     }
   }
 
-  public static void startBlurCluster(String path, int controllerCount, int shardCount) {
+  public void startBlurCluster(String path, int controllerCount, int shardCount) {
+    startBlurCluster(path, controllerCount, shardCount, false);
+  }
+
+  public void startBlurCluster(String path, int controllerCount, int shardCount, boolean randomPort) {
     startDfs(path + "/hdfs");
-    startZooKeeper(path + "/zk");
+    startZooKeeper(path + "/zk", randomPort);
     setupBuffers();
-    startControllers(controllerCount);
-    startShards(shardCount);
+    startControllers(controllerCount, randomPort);
+    startShards(shardCount, randomPort);
   }
 
-  private static void setupBuffers() {
+  private void setupBuffers() {
     BufferStore.init(16, 16);
   }
 
-  public static void shutdownBlurCluster() {
+  public void shutdownBlurCluster() {
     stopShards();
     stopControllers();
     shutdownZooKeeper();
     shutdownDfs();
   }
 
-  private static void createTable(String test, Iface client) throws BlurException, TException, IOException {
+  private void createTable(String test, Iface client) throws BlurException, TException, IOException {
     final TableDescriptor descriptor = new TableDescriptor();
     descriptor.setName(test);
     descriptor.setShardCount(7);
@@ -158,11 +152,20 @@ public abstract class MiniCluster {
     client.createTable(descriptor);
   }
 
-  public static String getControllerConnectionStr() {
-    return controllerConnectionStr;
+  public String getControllerConnectionStr() {
+    StringBuilder builder = new StringBuilder();
+    for (ThriftServer server : controllers) {
+      if (builder.length() != 0) {
+        builder.append(',');
+      }
+      String hostName = server.getServerTransport().getBindAddr().getHostName();
+      int localPort = server.getServerTransport().getServerSocket().getLocalPort();
+      builder.append(hostName + ":" + localPort);
+    }
+    return builder.toString();
   }
 
-  private static void addRow(String table, int i, Iface client) throws BlurException, TException {
+  private void addRow(String table, int i, Iface client) throws BlurException, TException {
     Row row = new Row();
     row.setId(Integer.toString(i));
     Record record = new Record();
@@ -175,7 +178,7 @@ public abstract class MiniCluster {
     client.mutate(rowMutation);
   }
 
-  private static void searchRow(String table, int i, Iface client) throws BlurException, TException {
+  private void searchRow(String table, int i, Iface client) throws BlurException, TException {
     BlurQuery blurQuery = BlurThriftHelper.newSimpleQuery("test.test:" + i);
     System.out.println("Running [" + blurQuery + "]");
     BlurResults results = client.query(table, blurQuery);
@@ -184,24 +187,24 @@ public abstract class MiniCluster {
     }
   }
 
-  public static void stopControllers() {
+  public void stopControllers() {
     for (ThriftServer s : controllers) {
       s.close();
     }
   }
 
-  public static void stopShards() {
+  public void stopShards() {
     for (ThriftServer s : shards) {
       s.close();
     }
   }
 
-  public static void startControllers(int num) {
+  public void startControllers(int num, boolean randomPort) {
     BlurConfiguration configuration = getBlurConfiguration();
-    startControllers(configuration, num);
+    startControllers(configuration, num, randomPort);
   }
 
-  private static BlurConfiguration getBlurConfiguration(BlurConfiguration overrides) {
+  private BlurConfiguration getBlurConfiguration(BlurConfiguration overrides) {
     BlurConfiguration conf = getBlurConfiguration();
 
     for (Map.Entry<String, String> over : overrides.getProperties().entrySet()) {
@@ -210,7 +213,7 @@ public abstract class MiniCluster {
     return conf;
   }
 
-  private static BlurConfiguration getBlurConfiguration() {
+  private BlurConfiguration getBlurConfiguration() {
     BlurConfiguration configuration;
     try {
       configuration = new BlurConfiguration();
@@ -228,34 +231,26 @@ public abstract class MiniCluster {
     return configuration;
   }
 
-  public static void startControllers(BlurConfiguration configuration, int num) {
-    StringBuilder builder = new StringBuilder();
+  public void startControllers(BlurConfiguration configuration, int num, boolean randomPort) {
     BlurConfiguration localConf = getBlurConfiguration(configuration);
-    int controllerPort = localConf.getInt(BLUR_CONTROLLER_BIND_PORT, 40010);
     for (int i = 0; i < num; i++) {
       try {
-        ThriftServer server = ThriftBlurControllerServer.createServer(i, localConf);
+        ThriftServer server = ThriftBlurControllerServer.createServer(i, localConf, randomPort);
         controllers.add(server);
-        Connection connection = new Connection("localhost", controllerPort + i);
-        if (builder.length() != 0) {
-          builder.append(',');
-        }
-        builder.append(connection.getConnectionStr());
-        startServer(server, connection);
+        startServer(server);
       } catch (Exception e) {
         LOG.error(e);
         throw new RuntimeException(e);
       }
     }
-    controllerConnectionStr = builder.toString();
   }
 
-  public static void startShards(int num) {
+  public void startShards(int num, boolean randomPort) {
     BlurConfiguration configuration = getBlurConfiguration();
-    startShards(configuration, num);
+    startShards(configuration, num, randomPort);
   }
 
-  public static void startShards(final BlurConfiguration configuration, int num) {
+  public void startShards(final BlurConfiguration configuration, int num, final boolean randomPort) {
     final BlurConfiguration localConf = getBlurConfiguration(configuration);
     ExecutorService executorService = Executors.newFixedThreadPool(num);
     List<Future<ThriftServer>> futures = new ArrayList<Future<ThriftServer>>();
@@ -264,17 +259,15 @@ public abstract class MiniCluster {
       futures.add(executorService.submit(new Callable<ThriftServer>() {
         @Override
         public ThriftServer call() throws Exception {
-          return ThriftBlurShardServer.createServer(index, localConf);
+          return ThriftBlurShardServer.createServer(index, localConf, randomPort);
         }
       }));
     }
-    int shardPort = localConf.getInt(BLUR_SHARD_BIND_PORT, 40020);
     for (int i = 0; i < num; i++) {
       try {
         ThriftServer server = futures.get(i).get();
         shards.add(server);
-        Connection connection = new Connection("localhost", shardPort + i);
-        startServer(server, connection);
+        startServer(server);
       } catch (Exception e) {
         LOG.error(e);
         throw new RuntimeException(e);
@@ -282,16 +275,16 @@ public abstract class MiniCluster {
     }
   }
 
-  public static void killShardServer(int shardServer) throws IOException, InterruptedException, KeeperException {
+  public void killShardServer(int shardServer) throws IOException, InterruptedException, KeeperException {
     killShardServer(getBlurConfiguration(), shardServer);
   }
 
-  public static void killShardServer(final BlurConfiguration configuration, int shardServer) throws IOException,
+  public void killShardServer(final BlurConfiguration configuration, int shardServer) throws IOException,
       InterruptedException, KeeperException {
-    final BlurConfiguration localConf = getBlurConfiguration(configuration);
-    int shardPort = localConf.getInt(BLUR_SHARD_BIND_PORT, 40020);
+    ThriftServer thriftServer = shards.get(shardServer);
+    int shardPort = thriftServer.getServerTransport().getServerSocket().getLocalPort();
     String nodeNameHostname = ThriftServer.getNodeName(configuration, BLUR_SHARD_HOSTNAME);
-    String nodeName = nodeNameHostname + ":" + (shardPort + shardServer);
+    String nodeName = nodeNameHostname + ":" + shardPort;
     ZooKeeper zk = new ZooKeeperClient(getZkConnectionString(), 30000, new Watcher() {
       @Override
       public void process(WatchedEvent event) {
@@ -305,7 +298,7 @@ public abstract class MiniCluster {
     zk.close();
   }
 
-  private static void startServer(final ThriftServer server, Connection connection) {
+  private static void startServer(final ThriftServer server) {
     new Thread(new Runnable() {
       @Override
       public void run() {
@@ -323,27 +316,37 @@ public abstract class MiniCluster {
       } catch (InterruptedException e) {
         return;
       }
-      try {
-        Client client = BlurClientManager.newClient(connection);
-        BlurClientManager.close(client);
-        break;
-      } catch (TException e) {
-        throw new RuntimeException(e);
-      } catch (IOException e) {
-        LOG.info("Can not connection to [" + connection + "]");
+      int localPort = server.getLocalPort();
+      if (localPort == 0) {
+        continue;
+      } else {
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          LOG.error("Unknown error", e);
+        }
+        return;
       }
     }
   }
 
-  public static String getZkConnectionString() {
-    return zkConnectionString;
+  public String getZkConnectionString() {
+    return zooKeeperServerMain.getConnectionString();
+  }
+
+  public void startZooKeeper(String path) {
+    startZooKeeper(true, path, false);
   }
 
-  public static void startZooKeeper(String path) {
-    startZooKeeper(true, path);
+  public void startZooKeeper(String path, boolean randomPort) {
+    startZooKeeper(true, path, randomPort);
   }
 
-  public static void startZooKeeper(boolean format, String path) {
+  public void startZooKeeper(boolean format, String path) {
+    startZooKeeper(format, path, false);
+  }
+
+  public void startZooKeeper(boolean format, String path, boolean randomPort) {
     Properties properties = new Properties();
     properties.setProperty("tickTime", "2000");
     properties.setProperty("initLimit", "10");
@@ -351,25 +354,60 @@ public abstract class MiniCluster {
 
     properties.setProperty("clientPort", "21810");
 
-    startZooKeeper(properties, format, path);
+    startZooKeeper(properties, format, path, randomPort);
   }
 
-  public static void startZooKeeper(Properties properties, String path) {
-    startZooKeeper(properties, true, path);
+  public void startZooKeeper(Properties properties, String path) {
+    startZooKeeper(properties, true, path, false);
   }
 
-  private static class ZooKeeperServerMainEmbedded extends ZooKeeperServerMain {
+  public void startZooKeeper(Properties properties, String path, boolean randomPort) {
+    startZooKeeper(properties, true, path, randomPort);
+  }
+
+  private class ZooKeeperServerMainEmbedded extends ZooKeeperServerMain {
     @Override
     public void shutdown() {
       super.shutdown();
     }
+
+    public String getConnectionString() {
+      try {
+        Field field = ZooKeeperServerMain.class.getDeclaredField("cnxnFactory");
+        field.setAccessible(true);
+        ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) field.get(this);
+        InetSocketAddress address = serverCnxnFactory.getLocalAddress();
+        if (address == null) {
+          return null;
+        }
+        int localPort = serverCnxnFactory.getLocalPort();
+        return address.getAddress().getHostAddress() + ":" + localPort;
+      } catch (NullPointerException e) {
+        return null;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
   }
 
-  public static void startZooKeeper(final Properties properties, boolean format, String path) {
+  public void startZooKeeper(final Properties properties, boolean format, String path, final boolean randomPort) {
     String realPath = path + "/zk_test";
     properties.setProperty("dataDir", realPath);
     final ServerConfig serverConfig = new ServerConfig();
-    QuorumPeerConfig config = new QuorumPeerConfig();
+    QuorumPeerConfig config = new QuorumPeerConfig() {
+      @Override
+      public InetSocketAddress getClientPortAddress() {
+        InetSocketAddress clientPortAddress = super.getClientPortAddress();
+        if (randomPort) {
+          return randomPort(clientPortAddress);
+        }
+        return clientPortAddress;
+      }
+
+      private InetSocketAddress randomPort(InetSocketAddress clientPortAddress) {
+        return new InetSocketAddress(clientPortAddress.getAddress(), 0);
+      }
+    };
     try {
       config.parseProperties(properties);
     } catch (IOException e) {
@@ -403,7 +441,11 @@ public abstract class MiniCluster {
         throw new RuntimeException(e);
       }
       try {
-        ZooKeeper zk = new ZooKeeper(zkConnectionString, 30000, new Watcher() {
+        String zkConnectionString = getZkConnectionString();
+        if (zkConnectionString == null) {
+          continue;
+        }
+        ZooKeeper zk = new ZooKeeper(getZkConnectionString(), 30000, new Watcher() {
           @Override
           public void process(WatchedEvent event) {
 
@@ -421,23 +463,23 @@ public abstract class MiniCluster {
     }
   }
 
-  public static URI getFileSystemUri() throws IOException {
+  public URI getFileSystemUri() throws IOException {
     return cluster.getFileSystem().getUri();
   }
 
-  public static void startDfs(String path) {
+  public void startDfs(String path) {
     startDfs(true, path);
   }
 
-  public static void startDfs(boolean format, String path) {
+  public void startDfs(boolean format, String path) {
     startDfs(new Configuration(), format, path);
   }
 
-  public static void startDfs(Configuration conf, String path) {
+  public void startDfs(Configuration conf, String path) {
     startDfs(conf, true, path);
   }
 
-  public static void startDfs(Configuration conf, boolean format, String path) {
+  public void startDfs(Configuration conf, boolean format, String path) {
     System.setProperty("test.build.data", path);
     try {
       cluster = new MiniDFSCluster(conf, 1, true, (String[]) null);
@@ -448,11 +490,11 @@ public abstract class MiniCluster {
     }
   }
 
-  public static void shutdownZooKeeper() {
+  public void shutdownZooKeeper() {
     zooKeeperServerMain.shutdown();
   }
 
-  public static void shutdownDfs() {
+  public void shutdownDfs() {
     if (cluster != null) {
       LOG.info("Shutting down Mini DFS ");
       try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1cdb7fc7/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java b/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
index 52307f0..f143ae1 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
@@ -49,13 +49,13 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-
 public class ZookeeperClusterStatusTest {
 
   private static final String TEST = "test";
   private static final String DEFAULT = "default";
 
   private static final Log LOG = LogFactory.getLog(ZookeeperClusterStatusTest.class);
+  private static MiniCluster miniCluster;
   private ZooKeeper zooKeeper;
   private ZookeeperClusterStatus clusterStatus;
 
@@ -68,17 +68,18 @@ public class ZookeeperClusterStatusTest {
 
   @BeforeClass
   public static void setupOnce() throws InterruptedException, IOException, KeeperException {
-    MiniCluster.startZooKeeper("./target/zk_test");
+    miniCluster = new MiniCluster();
+    miniCluster.startZooKeeper("./target/zk_test", true);
   }
 
   @AfterClass
   public static void teardownOnce() {
-    MiniCluster.shutdownZooKeeper();
+    miniCluster.shutdownZooKeeper();
   }
 
   @Before
   public void setup() throws KeeperException, InterruptedException, IOException {
-    zooKeeper = new ZooKeeperClient(MiniCluster.getZkConnectionString(), 30000, new Watcher() {
+    zooKeeper = new ZooKeeperClient(miniCluster.getZkConnectionString(), 30000, new Watcher() {
       @Override
       public void process(WatchedEvent event) {
 
@@ -101,43 +102,46 @@ public class ZookeeperClusterStatusTest {
     assertEquals(Arrays.asList(DEFAULT), clusterList);
   }
 
-//  @Test
-//  public void testSafeModeNotSet() throws KeeperException, InterruptedException {
-//    LOG.warn("testSafeModeNotSet");
-//    assertFalse(clusterStatus.isInSafeMode(false, DEFAULT));
-//    new WaitForAnswerToBeCorrect(20L) {
-//      @Override
-//      public Object run() {
-//        return clusterStatus.isInSafeMode(true, DEFAULT);
-//      }
-//    }.test(false);
-//  }
-//
-//  @Test
-//  public void testSafeModeSetInPast() throws KeeperException, InterruptedException {
-//    LOG.warn("testSafeModeSetInPast");
-//    setSafeModeInPast();
-//    assertFalse(clusterStatus.isInSafeMode(false, DEFAULT));
-//    new WaitForAnswerToBeCorrect(20L) {
-//      @Override
-//      public Object run() {
-//        return clusterStatus.isInSafeMode(true, DEFAULT);
-//      }
-//    }.test(false);
-//  }
-//
-//  @Test
-//  public void testSafeModeSetInFuture() throws KeeperException, InterruptedException {
-//    LOG.warn("testSafeModeSetInFuture");
-//    setSafeModeInFuture();
-//    assertTrue(clusterStatus.isInSafeMode(false, DEFAULT));
-//    new WaitForAnswerToBeCorrect(20L) {
-//      @Override
-//      public Object run() {
-//        return clusterStatus.isInSafeMode(true, DEFAULT);
-//      }
-//    }.test(true);
-//  }
+  // @Test
+  // public void testSafeModeNotSet() throws KeeperException,
+  // InterruptedException {
+  // LOG.warn("testSafeModeNotSet");
+  // assertFalse(clusterStatus.isInSafeMode(false, DEFAULT));
+  // new WaitForAnswerToBeCorrect(20L) {
+  // @Override
+  // public Object run() {
+  // return clusterStatus.isInSafeMode(true, DEFAULT);
+  // }
+  // }.test(false);
+  // }
+  //
+  // @Test
+  // public void testSafeModeSetInPast() throws KeeperException,
+  // InterruptedException {
+  // LOG.warn("testSafeModeSetInPast");
+  // setSafeModeInPast();
+  // assertFalse(clusterStatus.isInSafeMode(false, DEFAULT));
+  // new WaitForAnswerToBeCorrect(20L) {
+  // @Override
+  // public Object run() {
+  // return clusterStatus.isInSafeMode(true, DEFAULT);
+  // }
+  // }.test(false);
+  // }
+  //
+  // @Test
+  // public void testSafeModeSetInFuture() throws KeeperException,
+  // InterruptedException {
+  // LOG.warn("testSafeModeSetInFuture");
+  // setSafeModeInFuture();
+  // assertTrue(clusterStatus.isInSafeMode(false, DEFAULT));
+  // new WaitForAnswerToBeCorrect(20L) {
+  // @Override
+  // public Object run() {
+  // return clusterStatus.isInSafeMode(true, DEFAULT);
+  // }
+  // }.test(true);
+  // }
 
   @Test
   public void testGetClusterNoTable() {
@@ -181,7 +185,7 @@ public class ZookeeperClusterStatusTest {
   public void testIsEnabledEnabledTable() throws KeeperException, InterruptedException {
     createTable("enabledtable", true);
     assertTrue(clusterStatus.isEnabled(false, DEFAULT, "enabledtable"));
-  
+
     new WaitForAnswerToBeCorrect(20L) {
       @Override
       public Object run() {
@@ -189,7 +193,7 @@ public class ZookeeperClusterStatusTest {
       }
     }.test(true);
   }
-  
+
   private void createTable(String name) throws KeeperException, InterruptedException {
     createTable(name, true);
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1cdb7fc7/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java b/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java
index 0765ce4..bde5180 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java
@@ -16,7 +16,10 @@ package org.apache.blur.manager.indexserver;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
@@ -40,12 +43,14 @@ public class SafeModeTest {
 
   private static String path = "./target/test-zk";
   private static ZooKeeper zk;
+  private static MiniCluster miniCluster;
 
   @BeforeClass
   public static void startZooKeeper() throws IOException {
     new File(path).mkdirs();
-    MiniCluster.startZooKeeper(path);
-    zk = new ZooKeeper(MiniCluster.getZkConnectionString(), 20000, new Watcher() {
+    miniCluster = new MiniCluster();
+    miniCluster.startZooKeeper(path, true);
+    zk = new ZooKeeper(miniCluster.getZkConnectionString(), 20000, new Watcher() {
       @Override
       public void process(WatchedEvent event) {
 
@@ -56,7 +61,7 @@ public class SafeModeTest {
   @AfterClass
   public static void stopZooKeeper() throws InterruptedException {
     zk.close();
-    MiniCluster.shutdownZooKeeper();
+    miniCluster.shutdownZooKeeper();
   }
 
   @Test
@@ -73,8 +78,16 @@ public class SafeModeTest {
       Thread.sleep(100);
     }
 
-    for (Thread t : threads) {
-      t.join();
+    boolean alive = true;
+    while (alive) {
+      alive = false;
+      for (Thread t : threads) {
+        t.join(1000);
+        if (t.isAlive()) {
+          System.out.println("Thread [" + t + "] has not finished.");
+          alive = true;
+        }
+      }
     }
 
     for (AtomicReference<Throwable> t : errors) {
@@ -101,7 +114,7 @@ public class SafeModeTest {
 
   @Test
   public void testExtraNodeStartup() throws IOException, InterruptedException, KeeperException {
-    ZooKeeper zk = new ZooKeeper(MiniCluster.getZkConnectionString(), 20000, new Watcher() {
+    ZooKeeper zk = new ZooKeeper(miniCluster.getZkConnectionString(), 20000, new Watcher() {
       @Override
       public void process(WatchedEvent event) {
 
@@ -120,7 +133,7 @@ public class SafeModeTest {
 
   @Test
   public void testSecondNodeStartup() throws IOException, InterruptedException, KeeperException {
-    ZooKeeper zk = new ZooKeeper(MiniCluster.getZkConnectionString(), 20000, new Watcher() {
+    ZooKeeper zk = new ZooKeeper(miniCluster.getZkConnectionString(), 20000, new Watcher() {
       @Override
       public void process(WatchedEvent event) {
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1cdb7fc7/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
index a04c300..59316f3 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
@@ -102,6 +102,7 @@ public class BlurIndexReaderTest {
 
   @After
   public void tearDown() throws IOException {
+    reader.close();
     mergeScheduler.close();
     closer.close();
     gc.close();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1cdb7fc7/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
index 601ee06..968ca77 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
@@ -58,15 +58,17 @@ import org.junit.Test;
 public class TransactionRecorderTest {
 
   private static final Log LOG = LogFactory.getLog(TransactionRecorderTest.class);
+  private static MiniCluster miniCluster;
 
   @BeforeClass
   public static void setup() {
-    MiniCluster.startDfs(new File("target", "transaction-recorder-test").toURI().toString());
+    miniCluster = new MiniCluster();
+    miniCluster.startDfs(new File("target", "transaction-recorder-test").getAbsolutePath());
   }
 
   @AfterClass
   public static void teardown() throws IOException {
-    MiniCluster.shutdownDfs();
+    miniCluster.shutdownDfs();
   }
 
   private Collection<Closeable> closeThis = new HashSet<Closeable>();
@@ -82,7 +84,7 @@ public class TransactionRecorderTest {
   public void testReplaySimpleTest() throws IOException, InterruptedException {
     TableContext.clear();
     Configuration configuration = new Configuration(false);
-    URI fileSystemUri = MiniCluster.getFileSystemUri();
+    URI fileSystemUri = miniCluster.getFileSystemUri();
     Path path = new Path(fileSystemUri.toString() + "/transaction-recorder-test");
     FileSystem fileSystem = path.getFileSystem(configuration);
     fileSystem.delete(path, true);
@@ -123,48 +125,48 @@ public class TransactionRecorderTest {
     System.out.println("assert");
     assertEquals(1, reader.numDocs());
   }
-  
+
   @Test
-  public void testConvertShouldPass(){
+  public void testConvertShouldPass() {
     String rowId = "RowId_123-1";
     Record record = new Record();
     record.setRecordId("RecordId_123-1");
     record.setFamily("Family_123-1");
-    
+
     Column column = new Column();
     column.setName("columnName_123-1");
     record.setColumns(Arrays.asList(column));
-    
+
     BlurUtil.validateRowIdAndRecord(rowId, record);
-    assert(true);
+    assert (true);
   }
-  
-  @Test(expected=IllegalArgumentException.class)
-  public void testConvertWithBadFamilyNameShouldFail(){
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testConvertWithBadFamilyNameShouldFail() {
     String rowId = "RowId_123-1";
     Record record = new Record();
     record.setRecordId("RecordId_123-1");
     record.setFamily("Family_123.1");
-    
+
     Column column = new Column();
     column.setName("columnName_123-1");
     record.setColumns(Arrays.asList(column));
-    
+
     BlurUtil.validateRowIdAndRecord(rowId, record);
     fail();
   }
-  
-  @Test(expected=IllegalArgumentException.class)
-  public void testConvertWithBadColumnNameShouldFail(){
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testConvertWithBadColumnNameShouldFail() {
     String rowId = "RowId_123-1";
     Record record = new Record();
     record.setRecordId("RecordId_123-1");
     record.setFamily("Family_123-1");
-    
+
     Column column = new Column();
     column.setName("columnName_123.1");
     record.setColumns(Arrays.asList(column));
-    
+
     BlurUtil.validateRowIdAndRecord(rowId, record);
     fail();
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1cdb7fc7/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
index 8bc99ed..54fe030 100644
--- a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
@@ -67,12 +67,12 @@ import org.junit.Test;
 public class BlurClusterTest {
 
   private static final int _1MB = 1000 * 1000;
-  private static final int _20MB = 20 * _1MB;
   private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "./target/tmp_BlurClusterTest"));
+  private static MiniCluster miniCluster;
 
   @BeforeClass
   public static void startCluster() throws IOException {
-    GCWatcher.init(0.80);
+    GCWatcher.init(0.60);
     LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
     File testDirectory = new File(TMPDIR, "blur-cluster-test").getAbsoluteFile();
     testDirectory.mkdirs();
@@ -90,32 +90,42 @@ public class BlurClusterTest {
     String dirPermissionNum = builder.toString();
     System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
     testDirectory.delete();
-
-    MiniCluster.startBlurCluster(new File(testDirectory, "cluster").toURI().toString(), 2, 3);
+    miniCluster = new MiniCluster();
+    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true);
   }
 
   @AfterClass
   public static void shutdownCluster() {
-    MiniCluster.shutdownBlurCluster();
+    miniCluster.shutdownBlurCluster();
   }
 
   private Iface getClient() {
-    return BlurClient.getClient(MiniCluster.getControllerConnectionStr());
+    return BlurClient.getClient(miniCluster.getControllerConnectionStr());
   }
 
   @Test
+  public void runClusterIntegrationTests() throws BlurException, TException, IOException, InterruptedException,
+      KeeperException {
+    testCreateTable();
+    testLoadTable();
+    testQueryCancel();
+    testBackPressureViaQuery();
+    testTestShardFailover();
+    testTermsList();
+    testCreateDisableAndRemoveTable();
+  }
+
   public void testCreateTable() throws BlurException, TException, IOException {
     Blur.Iface client = getClient();
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setName("test");
     tableDescriptor.setShardCount(5);
-    tableDescriptor.setTableUri(MiniCluster.getFileSystemUri().toString() + "/blur/test");
+    tableDescriptor.setTableUri(miniCluster.getFileSystemUri().toString() + "/blur/test");
     client.createTable(tableDescriptor);
     List<String> tableList = client.tableList();
     assertEquals(Arrays.asList("test"), tableList);
   }
 
-  @Test
   public void testLoadTable() throws BlurException, TException, InterruptedException {
     Iface client = getClient();
     int length = 100;
@@ -155,7 +165,6 @@ public class BlurClusterTest {
     assertFalse(schema.getFamilies().isEmpty());
   }
 
-  @Test
   public void testQueryCancel() throws BlurException, TException, InterruptedException {
     // This will make each collect in the collectors pause 250 ms per collect
     // call
@@ -190,14 +199,13 @@ public class BlurClusterTest {
     }).start();
     Thread.sleep(500);
     client.cancelQuery("test", blurQueryRow.getUuid());
-    BlurException blurException = pollForError(error, 10, TimeUnit.SECONDS, null);
+    BlurException blurException = pollForError(error, 10, TimeUnit.SECONDS, null, fail);
     if (fail.get()) {
       fail("Unknown error, failing test.");
     }
     assertEquals(blurException.getErrorType(), ErrorType.QUERY_CANCEL);
   }
 
-  @Test
   public void testBackPressureViaQuery() throws BlurException, TException, InterruptedException {
     // This will make each collect in the collectors pause 250 ms per collect
     // call
@@ -227,10 +235,15 @@ public class BlurClusterTest {
     MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
     MemoryUsage usage = memoryMXBean.getHeapMemoryUsage();
     long max = usage.getMax();
+    System.out.println("Max Heap [" + max + "]");
     long used = usage.getUsed();
+    System.out.println("Used Heap [" + used + "]");
     long limit = (long) (max * 0.80);
+    System.out.println("Limit Heap [" + limit + "]");
     long difference = limit - used;
-    byte[] bufferToFillHeap = new byte[(int) (difference - _20MB)];
+    int sizeToAllocate = (int) ((int) difference * 0.50);
+    System.out.println("Allocating [" + sizeToAllocate + "] Heap [" + getHeapSize() + "]");
+    byte[] bufferToFillHeap = new byte[sizeToAllocate];
     new Thread(new Runnable() {
       @Override
       public void run() {
@@ -248,7 +261,7 @@ public class BlurClusterTest {
     }).start();
     Thread.sleep(500);
     List<byte[]> bufferToPutGcWatcherOverLimitList = new ArrayList<byte[]>();
-    BlurException blurException = pollForError(error, 10, TimeUnit.SECONDS, bufferToPutGcWatcherOverLimitList);
+    BlurException blurException = pollForError(error, 120, TimeUnit.SECONDS, bufferToPutGcWatcherOverLimitList, fail);
     if (fail.get()) {
       fail("Unknown error, failing test.");
     }
@@ -261,26 +274,35 @@ public class BlurClusterTest {
   }
 
   private BlurException pollForError(AtomicReference<BlurException> error, long period, TimeUnit timeUnit,
-      List<byte[]> bufferToPutGcWatcherOverLimitList) throws InterruptedException {
+      List<byte[]> bufferToPutGcWatcherOverLimitList, AtomicBoolean fail) throws InterruptedException {
     long s = System.nanoTime();
     long totalTime = timeUnit.toNanos(period) + s;
+    int sizeToAllocate = _1MB * 10;
     if (bufferToPutGcWatcherOverLimitList != null) {
-      bufferToPutGcWatcherOverLimitList.add(new byte[_1MB * 5]);
+      System.out.println("Allocating [" + sizeToAllocate + "] Heap [" + getHeapSize() + "]");
+      bufferToPutGcWatcherOverLimitList.add(new byte[sizeToAllocate]);
     }
     while (totalTime > System.nanoTime()) {
+      if (fail.get()) {
+        fail("The query failed.");
+      }
       BlurException blurException = error.get();
       if (blurException != null) {
         return blurException;
       }
-      Thread.sleep(500);
+      Thread.sleep(250);
       if (bufferToPutGcWatcherOverLimitList != null) {
-        bufferToPutGcWatcherOverLimitList.add(new byte[_1MB * 5]);
+        System.out.println("Allocating [" + sizeToAllocate + "] Heap [" + getHeapSize() + "]");
+        bufferToPutGcWatcherOverLimitList.add(new byte[sizeToAllocate]);
       }
     }
     return null;
   }
 
-  @Test
+  private long getHeapSize() {
+    return ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
+  }
+
   public void testTestShardFailover() throws BlurException, TException, InterruptedException, IOException,
       KeeperException {
     Iface client = getClient();
@@ -294,7 +316,7 @@ public class BlurClusterTest {
     assertEquals(length, results1.getTotalResults());
     assertRowResults(results1);
 
-    MiniCluster.killShardServer(1);
+    miniCluster.killShardServer(1);
 
     // make sure the WAL syncs
     Thread.sleep(TimeUnit.SECONDS.toMillis(1));
@@ -306,7 +328,6 @@ public class BlurClusterTest {
 
   }
 
-  @Test
   public void testTermsList() throws BlurException, TException {
     Iface client = getClient();
     List<String> terms = client.terms("test", "test", "test", null, (short) 10);
@@ -337,14 +358,13 @@ public class BlurClusterTest {
     }
   }
 
-  @Test
   public void testCreateDisableAndRemoveTable() throws IOException, BlurException, TException {
     Iface client = getClient();
     String tableName = UUID.randomUUID().toString();
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setName(tableName);
     tableDescriptor.setShardCount(5);
-    tableDescriptor.setTableUri(MiniCluster.getFileSystemUri().toString() + "/blur/" + tableName);
+    tableDescriptor.setTableUri(miniCluster.getFileSystemUri().toString() + "/blur/" + tableName);
 
     for (int i = 0; i < 3; i++) {
       client.createTable(tableDescriptor);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1cdb7fc7/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java
index 560e5a3..af47614 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.blur.thirdparty.thrift_0_9_0.transport;
 
 import java.io.IOException;
@@ -54,6 +53,8 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport {
    */
   private int clientTimeout_ = 0;
 
+  private InetSocketAddress bindAddr_;
+
   /**
    * Creates just a port listening server socket
    */
@@ -73,6 +74,7 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport {
   }
 
   public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
+    bindAddr_ = bindAddr;
     clientTimeout_ = clientTimeout;
     try {
       serverSocketChannel = ServerSocketChannel.open();
@@ -147,4 +149,20 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport {
     close();
   }
 
+  public ServerSocketChannel getServerSocketChannel() {
+    return serverSocketChannel;
+  }
+
+  public ServerSocket getServerSocket() {
+    return serverSocket_;
+  }
+
+  public int getClientTimeout() {
+    return clientTimeout_;
+  }
+  
+  public InetSocketAddress getBindAddr() {
+    return bindAddr_;
+  }
+
 }


Mime
View raw message