incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [43/92] [abbrv] [partial] Fixed BLUR-126.
Date Tue, 11 Jun 2013 02:41:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/test/java/org/apache/blur/MiniCluster.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/MiniCluster.java b/blur-core/src/test/java/org/apache/blur/MiniCluster.java
new file mode 100644
index 0000000..275b261
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/MiniCluster.java
@@ -0,0 +1,531 @@
+package org.apache.blur;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_SLAB_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_HOSTNAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SAFEMODEDELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.BlurClientManager;
+import org.apache.blur.thrift.Connection;
+import org.apache.blur.thrift.ThriftBlurControllerServer;
+import org.apache.blur.thrift.ThriftBlurShardServer;
+import org.apache.blur.thrift.ThriftServer;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurResults;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.util.BlurThriftHelper;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.zookeeper.ZooKeeperClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+
+public abstract class MiniCluster {
+
+  private static Log LOG = LogFactory.getLog(MiniCluster.class);
+  private static MiniDFSCluster cluster;
+  private static Thread serverThread;
+  private static String zkConnectionString = "localhost:21810";
+  private static ZooKeeperServerMainEmbedded zooKeeperServerMain;
+  private static List<ThriftServer> controllers = new ArrayList<ThriftServer>();
+  private static List<ThriftServer> shards = new ArrayList<ThriftServer>();
+  private static String controllerConnectionStr;
+
+  public static void main(String[] args) throws IOException, InterruptedException, KeeperException, BlurException,
+      TException {
+    startDfs("./tmp/hdfs");
+    startZooKeeper("./tmp/zk");
+    startControllers(1);
+    startShards(1);
+
+    // Run the controllers/shards on custom ports.
+    // BlurConfiguration conf = new BlurConfiguration(false);
+    // conf.setInt(BLUR_CONTROLLER_BIND_PORT, 40001);
+    // conf.setInt(BLUR_SHARD_BIND_PORT, 40002);
+    // startControllers(conf, 1);
+    // startShards(conf, 1);
+
+    try {
+      Iface client = BlurClient.getClient(getControllerConnectionStr());
+      createTable("test", client);
+      long start = System.nanoTime();
+      for (int i = 0; i < 1000; i++) {
+        long now = System.nanoTime();
+        if (start + 5000000000L < now) {
+          System.out.println("Total [" + i + "]");
+          start = now;
+        }
+        addRow("test", i, client);
+      }
+
+      // This waits for all the data to become visible.
+      Thread.sleep(2000);
+
+      for (int i = 0; i < 1000; i++) {
+        searchRow("test", i, client);
+      }
+
+    } finally {
+      stopShards();
+      stopControllers();
+      shutdownZooKeeper();
+      shutdownDfs();
+    }
+  }
+
+  public static void startBlurCluster(String path, int controllerCount, int shardCount) {
+    startDfs(path + "/hdfs");
+    startZooKeeper(path + "/zk");
+    setupBuffers();
+    startControllers(controllerCount);
+    startShards(shardCount);
+  }
+
+  private static void setupBuffers() {
+    BufferStore.init(16, 16);
+  }
+
+  public static void shutdownBlurCluster() {
+    stopShards();
+    stopControllers();
+    shutdownZooKeeper();
+    shutdownDfs();
+  }
+
+  private static void createTable(String test, Iface client) throws BlurException, TException, IOException {
+    final TableDescriptor descriptor = new TableDescriptor();
+    descriptor.setName(test);
+    descriptor.setShardCount(7);
+    descriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    descriptor.setTableUri(getFileSystemUri() + "/blur/" + test);
+    client.createTable(descriptor);
+  }
+
+  public static String getControllerConnectionStr() {
+    return controllerConnectionStr;
+  }
+
+  private static void addRow(String table, int i, Iface client) throws BlurException, TException {
+    Row row = new Row();
+    row.setId(Integer.toString(i));
+    Record record = new Record();
+    record.setRecordId(Integer.toString(i));
+    record.setFamily("test");
+    record.addToColumns(new Column("test", Integer.toString(i)));
+    row.addToRecords(record);
+    RowMutation rowMutation = BlurUtil.toRowMutation(table, row);
+    rowMutation.setWal(false);
+    client.mutate(rowMutation);
+  }
+
+  private static void searchRow(String table, int i, Iface client) throws BlurException, TException {
+    BlurQuery blurQuery = BlurThriftHelper.newSimpleQuery("test.test:" + i);
+    System.out.println("Running [" + blurQuery + "]");
+    BlurResults results = client.query(table, blurQuery);
+    if (results.getTotalResults() != 1L) {
+      throw new RuntimeException("we got a problem here.");
+    }
+  }
+
+  public static void stopControllers() {
+    for (ThriftServer s : controllers) {
+      s.close();
+    }
+  }
+
+  public static void stopShards() {
+    for (ThriftServer s : shards) {
+      s.close();
+    }
+  }
+
+  public static void startControllers(int num) {
+    BlurConfiguration configuration = getBlurConfiguration();
+    startControllers(configuration, num);
+  }
+
+  private static BlurConfiguration getBlurConfiguration(BlurConfiguration overrides) {
+    BlurConfiguration conf = getBlurConfiguration();
+
+    for (Map.Entry<String, String> over : overrides.getProperties().entrySet()) {
+      conf.set(over.getKey().toString(), over.getValue().toString());
+    }
+    return conf;
+  }
+
+  private static BlurConfiguration getBlurConfiguration() {
+    BlurConfiguration configuration;
+    try {
+      configuration = new BlurConfiguration();
+    } catch (IOException e) {
+      LOG.error(e);
+      throw new RuntimeException(e);
+    }
+    configuration.set(BLUR_ZOOKEEPER_CONNECTION, getZkConnectionString());
+    configuration.set(BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION, "false");
+    configuration.set(BLUR_SHARD_BLOCKCACHE_SLAB_COUNT, "0");
+    configuration.setLong(BLUR_SHARD_SAFEMODEDELAY, 5000);
+    configuration.setInt(BLUR_GUI_CONTROLLER_PORT, -1);
+    configuration.setInt(BLUR_GUI_SHARD_PORT, -1);
+
+    return configuration;
+  }
+
+  public static void startControllers(BlurConfiguration configuration, int num) {
+    StringBuilder builder = new StringBuilder();
+    BlurConfiguration localConf = getBlurConfiguration(configuration);
+    int controllerPort = localConf.getInt(BLUR_CONTROLLER_BIND_PORT, 40010);
+    for (int i = 0; i < num; i++) {
+      try {
+        ThriftServer server = ThriftBlurControllerServer.createServer(i, localConf);
+        controllers.add(server);
+        Connection connection = new Connection("localhost", controllerPort + i);
+        if (builder.length() != 0) {
+          builder.append(',');
+        }
+        builder.append(connection.getConnectionStr());
+        startServer(server, connection);
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new RuntimeException(e);
+      }
+    }
+    controllerConnectionStr = builder.toString();
+  }
+
+  public static void startShards(int num) {
+    BlurConfiguration configuration = getBlurConfiguration();
+    startShards(configuration, num);
+  }
+
+  public static void startShards(final BlurConfiguration configuration, int num) {
+    final BlurConfiguration localConf = getBlurConfiguration(configuration);
+    ExecutorService executorService = Executors.newFixedThreadPool(num);
+    List<Future<ThriftServer>> futures = new ArrayList<Future<ThriftServer>>();
+    for (int i = 0; i < num; i++) {
+      final int index = i;
+      futures.add(executorService.submit(new Callable<ThriftServer>() {
+        @Override
+        public ThriftServer call() throws Exception {
+          return ThriftBlurShardServer.createServer(index, localConf);
+        }
+      }));
+    }
+    int shardPort = localConf.getInt(BLUR_SHARD_BIND_PORT, 40020);
+    for (int i = 0; i < num; i++) {
+      try {
+        ThriftServer server = futures.get(i).get();
+        shards.add(server);
+        Connection connection = new Connection("localhost", shardPort + i);
+        startServer(server, connection);
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public static void killShardServer(int shardServer) throws IOException, InterruptedException, KeeperException {
+    killShardServer(getBlurConfiguration(), shardServer);
+  }
+
+  public static void killShardServer(final BlurConfiguration configuration, int shardServer) throws IOException,
+      InterruptedException, KeeperException {
+    final BlurConfiguration localConf = getBlurConfiguration(configuration);
+    int shardPort = localConf.getInt(BLUR_SHARD_BIND_PORT, 40020);
+    String nodeNameHostname = ThriftServer.getNodeName(configuration, BLUR_SHARD_HOSTNAME);
+    String nodeName = nodeNameHostname + ":" + (shardPort + shardServer);
+    ZooKeeper zk = new ZooKeeperClient(getZkConnectionString(), 30000, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+
+      }
+    });
+    String onlineShardsPath = ZookeeperPathConstants
+        .getOnlineShardsPath(org.apache.blur.utils.BlurConstants.BLUR_CLUSTER);
+    String path = onlineShardsPath + "/" + nodeName;
+    zk.delete(path, -1);
+    zk.close();
+  }
+
+  private static void startServer(final ThriftServer server, Connection connection) {
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          server.start();
+        } catch (TTransportException e) {
+          LOG.error(e);
+          throw new RuntimeException(e);
+        }
+      }
+    }).start();
+    while (true) {
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException e) {
+        return;
+      }
+      try {
+        Client client = BlurClientManager.newClient(connection);
+        BlurClientManager.close(client);
+        break;
+      } catch (TException e) {
+        throw new RuntimeException(e);
+      } catch (IOException e) {
+        LOG.info("Can not connection to [" + connection + "]");
+      }
+    }
+  }
+
+  public static String getZkConnectionString() {
+    return zkConnectionString;
+  }
+
+  public static void startZooKeeper(String path) {
+    startZooKeeper(true, path);
+  }
+
+  public static void startZooKeeper(boolean format, String path) {
+    Properties properties = new Properties();
+    properties.setProperty("tickTime", "2000");
+    properties.setProperty("initLimit", "10");
+    properties.setProperty("syncLimit", "5");
+
+    properties.setProperty("clientPort", "21810");
+
+    startZooKeeper(properties, format, path);
+  }
+
+  public static void startZooKeeper(Properties properties, String path) {
+    startZooKeeper(properties, true, path);
+  }
+
+  private static class ZooKeeperServerMainEmbedded extends ZooKeeperServerMain {
+    @Override
+    public void shutdown() {
+      super.shutdown();
+    }
+  }
+
+  public static void startZooKeeper(final Properties properties, boolean format, String path) {
+    String realPath = path + "/zk_test";
+    properties.setProperty("dataDir", realPath);
+    final ServerConfig serverConfig = new ServerConfig();
+    QuorumPeerConfig config = new QuorumPeerConfig();
+    try {
+      config.parseProperties(properties);
+    } catch (IOException e) {
+      LOG.error(e);
+      throw new RuntimeException(e);
+    } catch (ConfigException e) {
+      LOG.error(e);
+      throw new RuntimeException(e);
+    }
+    serverConfig.readFrom(config);
+    rm(new File(realPath));
+    serverThread = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          zooKeeperServerMain = new ZooKeeperServerMainEmbedded();
+          zooKeeperServerMain.runFromConfig(serverConfig);
+        } catch (IOException e) {
+          LOG.error(e);
+        }
+      }
+    });
+    serverThread.start();
+    long s = System.nanoTime();
+    while (s + 10000000000L > System.nanoTime()) {
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException e) {
+        LOG.error(e);
+        throw new RuntimeException(e);
+      }
+      try {
+        ZooKeeper zk = new ZooKeeper(zkConnectionString, 30000, new Watcher() {
+          @Override
+          public void process(WatchedEvent event) {
+
+          }
+        });
+        zk.close();
+        break;
+      } catch (IOException e) {
+        LOG.error(e);
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        LOG.error(e);
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public static URI getFileSystemUri() throws IOException {
+    return cluster.getFileSystem().getUri();
+  }
+
+  public static void startDfs(String path) {
+    startDfs(true, path);
+  }
+
+  public static void startDfs(boolean format, String path) {
+    startDfs(new Configuration(), format, path);
+  }
+
+  public static void startDfs(Configuration conf, String path) {
+    startDfs(conf, true, path);
+  }
+
+  public static void startDfs(Configuration conf, boolean format, String path) {
+    System.setProperty("test.build.data", path);
+    try {
+      cluster = new MiniDFSCluster(conf, 1, true, (String[]) null);
+      cluster.waitActive();
+    } catch (Exception e) {
+      LOG.error("error opening file system", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void shutdownZooKeeper() {
+    zooKeeperServerMain.shutdown();
+  }
+
+  public static void shutdownDfs() {
+    if (cluster != null) {
+      LOG.info("Shutting down Mini DFS ");
+      try {
+        cluster.shutdown();
+      } catch (Exception e) {
+        // / Can get a java.lang.reflect.UndeclaredThrowableException thrown
+        // here because of an InterruptedException. Don't let exceptions in
+        // here be cause of test failure.
+      }
+      try {
+        FileSystem fs = cluster.getFileSystem();
+        if (fs != null) {
+          LOG.info("Shutting down FileSystem");
+          fs.close();
+        }
+        FileSystem.closeAll();
+      } catch (IOException e) {
+        LOG.error("error closing file system", e);
+      }
+
+      // This has got to be one of the worst hacks I have ever had to do.
+      // This is needed to shutdown 2 thread pools that are not shutdown by
+      // themselves.
+      ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
+      Thread[] threads = new Thread[100];
+      int enumerate = threadGroup.enumerate(threads);
+      for (int i = 0; i < enumerate; i++) {
+        Thread thread = threads[i];
+        if (thread.getName().startsWith("pool")) {
+          if (thread.isAlive()) {
+            thread.interrupt();
+            LOG.info("Stopping ThreadPoolExecutor [" + thread.getName() + "]");
+            Object target = getField(thread, "target");
+            if (target != null) {
+              ThreadPoolExecutor e = (ThreadPoolExecutor) getField(target, "this$0");
+              if (e != null) {
+                e.shutdownNow();
+              }
+            }
+            try {
+              LOG.info("Waiting for thread pool to exit [" + thread.getName() + "]");
+              thread.join();
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private static Object getField(Object o, String fieldName) {
+    try {
+      Field field = o.getClass().getDeclaredField(fieldName);
+      field.setAccessible(true);
+      return field.get(o);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
new file mode 100644
index 0000000..a7461d4
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -0,0 +1,1035 @@
+package org.apache.blur.manager;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.apache.blur.thrift.generated.RecordMutationType.APPEND_COLUMN_VALUES;
+import static org.apache.blur.thrift.generated.RecordMutationType.DELETE_ENTIRE_RECORD;
+import static org.apache.blur.thrift.generated.RecordMutationType.REPLACE_COLUMNS;
+import static org.apache.blur.thrift.generated.RecordMutationType.REPLACE_ENTIRE_RECORD;
+import static org.apache.blur.thrift.generated.RowMutationType.DELETE_ROW;
+import static org.apache.blur.thrift.generated.RowMutationType.UPDATE_ROW;
+import static org.apache.blur.thrift.util.BlurThriftHelper.match;
+import static org.apache.blur.thrift.util.BlurThriftHelper.newColumn;
+import static org.apache.blur.thrift.util.BlurThriftHelper.newRecord;
+import static org.apache.blur.thrift.util.BlurThriftHelper.newRecordMutation;
+import static org.apache.blur.thrift.util.BlurThriftHelper.newRow;
+import static org.apache.blur.thrift.util.BlurThriftHelper.newRowMutation;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.blur.manager.clusterstatus.ClusterStatus;
+import org.apache.blur.manager.indexserver.LocalIndexServer;
+import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Facet;
+import org.apache.blur.thrift.generated.FetchRecordResult;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.Schema;
+import org.apache.blur.thrift.generated.ScoreType;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.SimpleQuery;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class IndexManagerTest {
+  
+  private static final File TMPDIR = new File("./target/tmp");
+
+  private static final String SHARD_NAME = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, 0);
+  private static final String TABLE = "table";
+  private static final String FAMILY = "test-family";
+  private static final String FAMILY2 = "test-family2";
+  private LocalIndexServer server;
+  private IndexManager indexManager;
+
+  private File base;
+
+  @Before
+  public void setUp() throws BlurException, IOException, InterruptedException {
+    TableContext.clear();
+    base = new File(TMPDIR, "blur-index-manager-test");
+    rm(base);
+    
+    File file = new File(base, TABLE);
+    file.mkdirs();
+
+    final TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setName(TABLE);
+    tableDescriptor.setTableUri(file.toURI().toString());
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs", Long.toString(100));
+    tableDescriptor.setShardCount(1);
+    server = new LocalIndexServer(tableDescriptor);
+
+    indexManager = new IndexManager();
+    indexManager.setStatusCleanupTimerDelay(1000);
+    indexManager.setIndexServer(server);
+    indexManager.setThreadCount(1);
+    indexManager.setClusterStatus(new ClusterStatus() {
+      
+      @Override
+      public void removeTable(String cluster, String table, boolean deleteIndexFiles) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public boolean isReadOnly(boolean useCache, String cluster, String table) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public boolean isOpen() {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public boolean isInSafeMode(boolean useCache, String cluster) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public boolean isEnabled(boolean useCache, String cluster, String table) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public boolean isBlockCacheEnabled(String cluster, String table) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public List<String> getTableList(boolean useCache, String cluster) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public TableDescriptor getTableDescriptor(boolean useCache, String cluster, String table) {
+        return tableDescriptor;
+      }
+      
+      @Override
+      public List<String> getShardServerList(String cluster) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public int getShardCount(boolean useCache, String cluster, String table) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public List<String> getOnlineShardServers(boolean useCache, String cluster) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public List<String> getControllerServerList() {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public List<String> getClusterList(boolean useCache) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public String getCluster(boolean useCache, String table) {
+        return BlurConstants.BLUR_CLUSTER;
+      }
+      
+      @Override
+      public Set<String> getBlockCacheFileTypes(String cluster, String table) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public boolean exists(boolean useCache, String cluster, String table) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public void enableTable(String cluster, String table) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public void disableTable(String cluster, String table) {
+        throw new RuntimeException("Not impl");
+      }
+      
+      @Override
+      public void createTable(TableDescriptor tableDescriptor) {
+        throw new RuntimeException("Not impl");
+      }
+    });
+    indexManager.init();
+    setupData();
+  }
+
+  @After
+  public void teardown() {
+    indexManager.close();
+    indexManager = null;
+    server = null;
+  }
+
+  private void rm(File file) {
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  private void setupData() throws BlurException, IOException {
+    RowMutation mutation1 = newRowMutation(TABLE, "row-1",
+        newRecordMutation(FAMILY, "record-1",   newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3")));
+    RowMutation mutation2 = newRowMutation(TABLE, "row-2",
+        newRecordMutation(FAMILY, "record-2",   newColumn("testcol1", "value4"), newColumn("testcol2", "value5"), newColumn("testcol3", "value6")),
+        newRecordMutation(FAMILY, "record-2B",  newColumn("testcol2", "value234123"), newColumn("testcol3", "value234123")));
+    RowMutation mutation3 = newRowMutation(TABLE, "row-3",
+        newRecordMutation(FAMILY, "record-3",   newColumn("testcol1", "value7"), newColumn("testcol2", "value8"), newColumn("testcol3", "value9")));
+    RowMutation mutation4 = newRowMutation(TABLE, "row-4",
+        newRecordMutation(FAMILY, "record-4",   newColumn("testcol1", "value1"), newColumn("testcol2", "value5"), newColumn("testcol3", "value9")),
+        newRecordMutation(FAMILY, "record-4B",  newColumn("testcol2", "value234123"), newColumn("testcol3", "value234123")));
+    RowMutation mutation5 = newRowMutation(TABLE,"row-5",
+        newRecordMutation(FAMILY, "record-5A",  newColumn("testcol1", "value13"), newColumn("testcol2", "value14"), newColumn("testcol3", "value15")),
+        newRecordMutation(FAMILY, "record-5B",  newColumn("testcol1", "value16"), newColumn("testcol2", "value17"), newColumn("testcol3", "value18"), newColumn("testcol3", "value19")));
+    RowMutation mutation6 = newRowMutation(TABLE, "row-6", 
+        newRecordMutation(FAMILY, "record-6A",  newColumn("testcol12", "value110"), newColumn("testcol13", "value102")),
+        newRecordMutation(FAMILY, "record-6B",  newColumn("testcol12", "value101"), newColumn("testcol13", "value104")),
+        newRecordMutation(FAMILY2, "record-6C", newColumn("testcol18", "value501")));
+    RowMutation mutation7 = newRowMutation(TABLE, "row-7", 
+        newRecordMutation(FAMILY, "record-7A",  newColumn("testcol12", "value101"), newColumn("testcol13", "value102")),
+        newRecordMutation(FAMILY2, "record-7B", newColumn("testcol18", "value501")));
+    mutation7.waitToBeVisible = true;
+    indexManager.mutate(mutation1);
+    indexManager.mutate(mutation2);
+    indexManager.mutate(mutation3);
+    indexManager.mutate(mutation4);
+    indexManager.mutate(mutation5);
+    indexManager.mutate(mutation6);
+    indexManager.mutate(mutation7);
+  }
+
+  @Test
+  public void testQueryWithJoinAll() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "+super:<+test-family.testcol12:value101 +test-family.testcol13:value102> +super:<test-family2.testcol18:value501>";
+    
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 1);
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId());
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNotNull(fetchResult.rowResult);
+      assertNull(fetchResult.recordResult);
+    }
+  }
+
+  @Test
+  public void testQueryWithJoin() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "+super:<+test-family.testcol12:value101 +test-family.testcol13:value102> +super:<test-family2.testcol18:value501>";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 1);
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId());
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNotNull(fetchResult.rowResult);
+      assertNull(fetchResult.recordResult);
+    }
+  }
+
+  @Test
+  public void testQueryWithJoinForcingSuperQuery() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "+super:<test-family.testcol1:value1> +super:<test-family.testcol3:value234123>";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 1);
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId());
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNotNull(fetchResult.rowResult);
+      assertNull(fetchResult.recordResult);
+    }
+  }
+
+  @Test
+  public void testQueryWithFacetsWithWildCard() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+    blurQuery.facets = Arrays.asList(new Facet("test-family.testcol1:value*", Long.MAX_VALUE), new Facet("test-family.testcol1:value-nohit", Long.MAX_VALUE));
+
+    AtomicLongArray facetedCounts = new AtomicLongArray(2);
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, facetedCounts);
+    assertEquals(iterable.getTotalResults(), 2);
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId());
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNotNull(fetchResult.rowResult);
+      assertNull(fetchResult.recordResult);
+    }
+
+    assertEquals(2, facetedCounts.get(0));
+    assertEquals(0, facetedCounts.get(1));
+
+    assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+    Thread.sleep(2000);// wait for cleanup to fire
+    assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+  }
+
+  @Test
+  public void testFetchRowByLocationId() throws Exception {
+    Selector selector = new Selector().setLocationId(SHARD_NAME + "/0");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNotNull(fetchResult.rowResult.row);
+    Row row = newRow("row-1", newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3")));
+    row.recordCount = 1;
+    assertEquals(row, fetchResult.rowResult.row);
+  }
+
+  @Test
+  public void testFetchMissingRowByLocationId() throws Exception {
+    try {
+      Selector selector = new Selector().setLocationId("shard4/0");
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      fail("Should throw exception");
+    } catch (BlurException e) {
+    }
+  }
+
+  @Test
+  public void testFetchRecordByLocationId() throws Exception {
+    Selector selector = new Selector().setLocationId(SHARD_NAME + "/0").setRecordOnly(true);
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNull(fetchResult.rowResult);
+    assertNotNull(fetchResult.recordResult.record);
+
+    assertEquals("row-1", fetchResult.recordResult.rowid);
+    assertEquals("record-1", fetchResult.recordResult.record.recordId);
+    assertEquals(FAMILY, fetchResult.recordResult.record.family);
+
+    Record record = newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3"));
+    assertEquals(record, fetchResult.recordResult.record);
+  }
+
+  @Test
+  public void testFetchRowByRowId() throws Exception {
+    Selector selector = new Selector().setRowId("row-1");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNotNull(fetchResult.rowResult.row);
+    Row row = newRow("row-1", newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3")));
+    row.recordCount = 1;
+    assertEquals(row, fetchResult.rowResult.row);
+  }
+  
+  @Test
+  public void testFetchRowByRowIdPaging() throws Exception {
+    Selector selector = new Selector().setRowId("row-6").setStartRecord(0).setMaxRecordsToFetch(1);
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNotNull(fetchResult.rowResult.row);
+
+    Row row1 = newRow("row-6", newRecord(FAMILY, "record-6A", newColumn("testcol12", "value110"), newColumn("testcol13", "value102")));
+    row1.recordCount = 1;
+    assertEquals(row1, fetchResult.rowResult.row);
+    
+    selector = new Selector().setRowId("row-6").setStartRecord(1).setMaxRecordsToFetch(1);
+    fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNotNull(fetchResult.rowResult.row);
+    
+    Row row2 = newRow("row-6", newRecord(FAMILY, "record-6B", newColumn("testcol12", "value101"), newColumn("testcol13", "value104")));
+    row2.recordCount = 1;
+    assertEquals(row2, fetchResult.rowResult.row);
+    
+    selector = new Selector().setRowId("row-6").setStartRecord(2).setMaxRecordsToFetch(1);
+    fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNotNull(fetchResult.rowResult.row);
+    
+    Row row3 = newRow("row-6", newRecord(FAMILY2, "record-6C", newColumn("testcol18", "value501")));
+    row3.recordCount = 1;
+    assertEquals(row3, fetchResult.rowResult.row);
+    
+    selector = new Selector().setRowId("row-6").setStartRecord(3).setMaxRecordsToFetch(1);
+    fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNull(fetchResult.rowResult.row);
+  }
+
+  @Test
+  public void testFetchRowByRecordIdOnly() throws Exception {
+    Selector selector = new Selector().setRecordId("record-1");
+    FetchResult fetchResult = new FetchResult();
+    try {
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      fail("Invalid selector should throw exception.");
+    } catch (BlurException e) {
+      // do nothing, this is a pass
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testFetchRowByRecordIdOnlyNoRecordOnly() throws Exception {
+    Selector selector = new Selector().setRowId("row-1").setRecordId("record-1");
+    FetchResult fetchResult = new FetchResult();
+    try {
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      fail("Invalid selector should throw exception.");
+    } catch (BlurException e) {
+      // do nothing, this is a pass
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testFetchRowByRecordId() throws Exception {
+    Selector selector = new Selector().setRowId("row-1").setRecordId("record-1").setRecordOnly(true);
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertFalse(fetchResult.deleted);
+    assertTrue(fetchResult.exists);
+    assertEquals(TABLE, fetchResult.table);
+    assertNull(fetchResult.rowResult);
+    assertNotNull(fetchResult.recordResult);
+    FetchRecordResult recordResult = fetchResult.recordResult;
+    assertEquals(FAMILY, recordResult.record.family);
+    assertEquals("record-1", recordResult.record.recordId);
+    assertEquals("row-1", recordResult.rowid);
+
+    Record record = newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3"));
+    assertEquals(record, recordResult.record);
+
+  }
+
+  @Test
+  public void testRecordFrequency() throws Exception {
+    assertEquals(2, indexManager.recordFrequency(TABLE, FAMILY, "testcol1", "value1"));
+    assertEquals(0, indexManager.recordFrequency(TABLE, FAMILY, "testcol1", "NO VALUE"));
+  }
+
+  @Test
+  public void testSchema() throws Exception {
+    Schema schema = indexManager.schema(TABLE);
+    assertEquals(TABLE, schema.table);
+    Map<String, Set<String>> columnFamilies = schema.columnFamilies;
+    assertEquals(new TreeSet<String>(Arrays.asList(FAMILY, FAMILY2)), new TreeSet<String>(columnFamilies.keySet()));
+    assertEquals(new TreeSet<String>(Arrays.asList("testcol1", "testcol2", "testcol3", "testcol12", "testcol13")), new TreeSet<String>(columnFamilies.get(FAMILY)));
+    assertEquals(new TreeSet<String>(Arrays.asList("testcol18")), new TreeSet<String>(columnFamilies.get(FAMILY2)));
+  }
+
+  @Test
+  public void testQuerySuperQueryTrue() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(2, iterable.getTotalResults());
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId());
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNotNull(fetchResult.rowResult);
+      assertNull(fetchResult.recordResult);
+    }
+
+    assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+    Thread.sleep(2000);// wait for cleanup to fire
+    assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+  }
+
+  @Test
+  public void testQuerySuperQueryTrueWithSelector() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+    blurQuery.selector = new Selector();
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 2);
+    for (BlurResult result : iterable) {
+      assertNotNull(result.fetchResult.rowResult);
+      assertNull(result.fetchResult.recordResult);
+    }
+
+    assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+    Thread.sleep(2000);// wait for cleanup to fire
+    assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+  }
+
+  @Test
+  public void testQuerySuperQueryFalse() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.simpleQuery.superQueryOn = false;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 2);
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId()).setRecordOnly(true);
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNull(fetchResult.rowResult);
+      assertNotNull(fetchResult.recordResult);
+    }
+
+    assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+    Thread.sleep(2000);// wait for cleanup to fire
+    assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+  }
+
+  @Test
+  public void testQuerySuperQueryFalseWithSelector() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.simpleQuery.superQueryOn = false;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+    blurQuery.selector = new Selector();
+    blurQuery.selector.setRecordOnly(true);
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 2);
+    for (BlurResult result : iterable) {
+      assertNull(result.fetchResult.rowResult);
+      assertNotNull(result.fetchResult.recordResult);
+    }
+
+    assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+    Thread.sleep(2000);// wait for cleanup to fire
+    assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+  }
+
+  @Test
+  public void testQueryRecordOnly() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.selector = new Selector();
+    blurQuery.selector.setRecordOnly(true);
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 2);
+
+    int matchRecord1 = 0;
+    int matchRecord4 = 0;
+
+    for (BlurResult result : iterable) {
+      assertNull(result.fetchResult.rowResult);
+      assertNotNull(result.fetchResult.recordResult);
+
+      Record r = result.fetchResult.recordResult.record;
+
+      if (r.getRecordId().equals("record-1")) {
+        matchRecord1 += 1;
+      } else if (r.getRecordId().equals("record-4")) {
+        matchRecord4 += 1;
+      } else {
+        fail("Unexpected record ID [" + r.getRecordId() + "]");
+      }
+    }
+
+    assertEquals("Unexpected number of record-1 results", 1, matchRecord1);
+    assertEquals("Unexpected number of record-4 results", 1, matchRecord4);
+  }
+
+  @Test
+  public void testQueryWithFacets() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+    blurQuery.facets = Arrays.asList(new Facet("test-family.testcol1:value1", Long.MAX_VALUE), new Facet("test-family.testcol1:value-nohit", Long.MAX_VALUE));
+
+    AtomicLongArray facetedCounts = new AtomicLongArray(2);
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, facetedCounts);
+    assertEquals(iterable.getTotalResults(), 2);
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId());
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNotNull(fetchResult.rowResult);
+      assertNull(fetchResult.recordResult);
+    }
+
+    assertEquals(2, facetedCounts.get(0));
+    assertEquals(0, facetedCounts.get(1));
+
+    assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+    Thread.sleep(2000);// wait for cleanup to fire
+    assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+  }
+
+  @Test
+  public void testTerms() throws Exception {
+    List<String> terms = indexManager.terms(TABLE, FAMILY, "testcol1", "", (short) 100);
+    assertEquals(Arrays.asList("value1", "value13", "value16", "value4", "value7"), terms);
+  }
+
+  @Test
+  public void testMutationReplaceRow() throws Exception {
+    RowMutation mutation = newRowMutation(TABLE, "row-4",
+        newRecordMutation(FAMILY, "record-4", newColumn("testcol1", "value2"), newColumn("testcol2", "value3"), newColumn("testcol3", "value4")));
+    mutation.waitToBeVisible = true;
+    indexManager.mutate(mutation);
+
+    Selector selector = new Selector().setRowId("row-4");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNotNull(fetchResult.rowResult.row);
+    Row row = newRow("row-4", newRecord(FAMILY, "record-4", newColumn("testcol1", "value2"), newColumn("testcol2", "value3"), newColumn("testcol3", "value4")));
+    row.recordCount = 1;
+    assertEquals(row, fetchResult.rowResult.row);
+  }
+
+  @Test
+  public void testMutationReplaceMissingRow() throws Exception {
+    Column c1 = newColumn("testcol1", "value20");
+    Column c2 = newColumn("testcol2", "value21");
+    Column c3 = newColumn("testcol3", "value22");
+    String rec = "record-6";
+    RecordMutation rm = newRecordMutation(FAMILY, rec, c1, c2, c3);
+    RowMutation mutation = newRowMutation(TABLE, "row-6", rm);
+    mutation.waitToBeVisible = true;
+    indexManager.mutate(mutation);
+
+    Selector selector = new Selector().setRowId("row-6");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    Row r = fetchResult.rowResult.row;
+    assertNotNull("new row should exist", r);
+    Row row = newRow("row-6", newRecord(FAMILY, "record-6", newColumn("testcol1", "value20"), newColumn("testcol2", "value21"), newColumn("testcol3", "value22")));
+    row.recordCount = 1;
+    assertEquals("row should match", row, r);
+  }
+
+  @Test
+  public void testMutationDeleteRow() throws Exception {
+    RowMutation mutation = newRowMutation(DELETE_ROW, TABLE, "row-2");
+    mutation.waitToBeVisible = true;
+    indexManager.mutate(mutation);
+
+    Selector selector = new Selector().setRowId("row-2");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNull("row should be deleted", fetchResult.rowResult);
+  }
+
+  @Test
+  public void testMutationDeleteMissingRow() throws Exception {
+    RowMutation mutation = newRowMutation(DELETE_ROW, TABLE, "row-6");
+    mutation.waitToBeVisible = true;
+    indexManager.mutate(mutation);
+
+    Selector selector = new Selector().setRowId("row-6");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNull("row should not exist", fetchResult.rowResult);
+  }
+
+  @Test
+  public void testMutationUpdateRowDeleteLastRecord() throws Exception {
+    RecordMutation rm = newRecordMutation(DELETE_ENTIRE_RECORD, FAMILY, "record-3");
+
+    RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, "row-3", rm);
+
+    rowMutation.waitToBeVisible = true;
+    indexManager.mutate(rowMutation);
+
+    Selector selector = new Selector().setRowId("row-3");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNull("row should not exist", fetchResult.rowResult);
+  }
+
+  @Test
+  public void testMutationUpdateRowDeleteRecord() throws Exception {
+    RecordMutation rm = newRecordMutation(DELETE_ENTIRE_RECORD, FAMILY, "record-5A");
+
+    RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, "row-5", rm);
+    rowMutation.waitToBeVisible = true;
+    indexManager.mutate(rowMutation);
+
+    Selector selector = new Selector().setRowId("row-5");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNotNull("row should exist", fetchResult.rowResult);
+    assertNotNull("row should exist", fetchResult.rowResult.row);
+    assertEquals("row should have one record", 1, fetchResult.rowResult.row.getRecordsSize());
+  }
+
+  @Test(expected = BlurException.class)
+  public void testMutationUpdateMissingRowDeleteRecord() throws Exception {
+    RecordMutation rm = newRecordMutation(DELETE_ENTIRE_RECORD, FAMILY, "record-101");
+
+    RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, "row-101", rm);
+    rowMutation.waitToBeVisible = true;
+    indexManager.mutate(rowMutation);
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceExistingRecord() throws Exception {
+    Column c1 = newColumn("testcol4", "value104");
+    Column c2 = newColumn("testcol5", "value105");
+    Column c3 = newColumn("testcol6", "value105");
+    String rec = "record-5A";
+    RecordMutation rm = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, rec, c1, c2, c3);
+
+    Record r = updateAndFetchRecord("row-5", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 3 columns in record", 3, r.getColumnsSize());
+    assertTrue("column 1 should be in record", r.columns.contains(c1));
+    assertTrue("column 2 should be in record", r.columns.contains(c2));
+    assertTrue("column 3 should be in record", r.columns.contains(c3));
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceMissingRecord() throws Exception {
+    Column c1 = newColumn("testcol4", "value104");
+    Column c2 = newColumn("testcol5", "value105");
+    Column c3 = newColumn("testcol6", "value105");
+    String rec = "record-5C";
+    RecordMutation rm = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, rec, c1, c2, c3);
+
+    Record r = updateAndFetchRecord("row-5", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 3 columns in record", 3, r.getColumnsSize());
+    assertTrue("column 1 should be in record", r.columns.contains(c1));
+    assertTrue("column 2 should be in record", r.columns.contains(c2));
+    assertTrue("column 3 should be in record", r.columns.contains(c3));
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceMixedRecords() throws Exception {
+    Column c1 = newColumn("testcol4", "value104");
+    Column c2 = newColumn("testcol5", "value105");
+    Column c3 = newColumn("testcol6", "value105");
+    RecordMutation rm1 = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, "record-5A", c1, c2, c3);
+    Column c4 = newColumn("testcol4", "value104");
+    Column c5 = newColumn("testcol5", "value105");
+    Column c6 = newColumn("testcol6", "value105");
+    RecordMutation rm2 = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, "record-5C", c4, c5, c6);
+
+    RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, "row-5", rm1, rm2);
+    rowMutation.waitToBeVisible = true;
+    indexManager.mutate(rowMutation);
+
+    Selector selector = new Selector().setRowId("row-5");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    Row r = fetchResult.rowResult.row;
+    assertNotNull("row should exist", r);
+    assertEquals("only 3 records in row", 3, r.getRecordsSize());
+    int rm1Matches = 0;
+    int rm2Matches = 0;
+    int nonMatches = 0;
+    for (Record record : r.records) {
+      if (match(rm1, record)) {
+        rm1Matches += 1;
+      } else if (match(rm2, record)) {
+        rm2Matches += 1;
+      } else {
+        nonMatches += 1;
+      }
+    }
+    assertEquals("matching record should be updated", 1, rm1Matches);
+    assertEquals("missing record should be added", 1, rm2Matches);
+    assertEquals("unmodified record should exist", 1, nonMatches);
+  }
+
+  @Test(expected = BlurException.class)
+  public void testMutationUpdateMissingRowReplaceRecord() throws Exception {
+    Column c1 = newColumn("testcol1", "value104");
+    Column c2 = newColumn("testcol2", "value105");
+    Column c3 = newColumn("testcol3", "value105");
+    String rec = "record-100";
+    RecordMutation rm = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, rec, c1, c2, c3);
+
+    updateAndFetchRecord("row-100", rec, rm);
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceExistingColumns() throws Exception {
+    Column c1 = newColumn("testcol1", "value999");
+    Column c2 = newColumn("testcol2", "value9999");
+    String rec = "record-1";
+    RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+    Record r = updateAndFetchRecord("row-1", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 3 columns in record", 3, r.getColumnsSize());
+    assertTrue("column 1 should be in record", r.columns.contains(c1));
+    assertTrue("column 2 should be in record", r.columns.contains(c2));
+    boolean foundUnmodifiedColumn = false;
+    for (Column column : r.columns) {
+      if (column.name.equals("testcol3") && column.value.equals("value3")) {
+        foundUnmodifiedColumn = true;
+        break;
+      }
+    }
+    assertTrue("column 3 should be unmodified", foundUnmodifiedColumn);
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceExistingDuplicateColumns() throws Exception {
+    Column c = newColumn("testcol3", "value999");
+    String rec = "record-5B";
+    RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c);
+
+    Record r = updateAndFetchRecord("row-5", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 3 columns in record", 3, r.getColumnsSize());
+    assertTrue("new column should be in record", r.columns.contains(c));
+    boolean foundDuplicateColumn = false;
+    for (Column column : r.columns) {
+      if (column.name.equals(c.name) && !column.value.equals(c.value)) {
+        foundDuplicateColumn = true;
+        break;
+      }
+    }
+    assertFalse("duplicate columns should be removed", foundDuplicateColumn);
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceMissingColumns() throws Exception {
+    Column c1 = newColumn("testcol4", "value999");
+    Column c2 = newColumn("testcol5", "value9999");
+    String rec = "record-1";
+    RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+    Record r = updateAndFetchRecord("row-1", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 5 columns in record", 5, r.getColumnsSize());
+    assertTrue("column 1 should be in record", r.columns.contains(c1));
+    assertTrue("column 2 should be in record", r.columns.contains(c2));
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceMixedColumns() throws Exception {
+    Column c1 = newColumn("testcol1", "value999");
+    Column c2 = newColumn("testcol4", "value9999");
+    String rec = "record-1";
+    RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+    Record r = updateAndFetchRecord("row-1", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 4 columns in record", 4, r.getColumnsSize());
+    assertTrue("column 1 should be in record", r.columns.contains(c1));
+    assertTrue("column 2 should be in record", r.columns.contains(c2));
+  }
+
+  @Test(expected = BlurException.class)
+  public void testMutationUpdateRowMissingRecordReplaceColumns() throws Exception {
+    Column c1 = newColumn("testcol4", "value999");
+    Column c2 = newColumn("testcol5", "value9999");
+    String rec = "record-1B";
+    RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+    updateAndFetchRecord("row-1", rec, rm);
+  }
+
+  @Test(expected = BlurException.class)
+  public void testMutationUpdateMissingRowReplaceColumns() throws Exception {
+    Column c1 = newColumn("testcol1", "value999");
+    Column c2 = newColumn("testcol2", "value9999");
+    String rec = "record-6";
+    RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+    updateAndFetchRecord("row-6", rec, rm);
+  }
+
+  @Test
+  public void testMutationUpdateRowAppendColumns() throws Exception {
+    Column c1 = newColumn("testcol1", "value999");
+    Column c2 = newColumn("testcol2", "value9999");
+    Column c3 = newColumn("testcol4", "hmm");
+    String rec = "record-1";
+    RecordMutation rm = newRecordMutation(APPEND_COLUMN_VALUES, FAMILY, rec, c1, c2, c3);
+
+    Record r = updateAndFetchRecord("row-1", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 6 columns in record", 6, r.getColumnsSize());
+    assertTrue("column 1 should be in record", r.columns.contains(c1));
+    assertTrue("column 2 should be in record", r.columns.contains(c2));
+    assertTrue("column 3 should be in record", r.columns.contains(c3));
+    int numTestcol1 = 0;
+    int numTestcol2 = 0;
+    int numTestcol3 = 0;
+    int numTestcol4 = 0;
+    int others = 0;
+    for (Column column : r.columns) {
+      if (column.name.equals("testcol1")) {
+        numTestcol1 += 1;
+      } else if (column.name.equals("testcol2")) {
+        numTestcol2 += 1;
+      } else if (column.name.equals("testcol3")) {
+        numTestcol3 += 1;
+      } else if (column.name.equals("testcol4")) {
+        numTestcol4 += 1;
+      } else {
+        others += 1;
+      }
+    }
+    assertEquals("should append testcol1", 2, numTestcol1);
+    assertEquals("should append testcol2", 2, numTestcol2);
+    assertEquals("should not append testcol3", 1, numTestcol3);
+    assertEquals("should append testcol4", 1, numTestcol4);
+    assertEquals("should not find other columns", 0, others);
+  }
+
+  @Test(expected = BlurException.class)
+  public void testMutationUpdateRowMissingRecordAppendColumns() throws Exception {
+    Column c1 = newColumn("testcol1", "value999");
+    Column c2 = newColumn("testcol2", "value9999");
+    Column c3 = newColumn("testcol4", "hmm");
+    String rec = "record-1B";
+    RecordMutation rm = newRecordMutation(APPEND_COLUMN_VALUES, FAMILY, rec, c1, c2, c3);
+
+    updateAndFetchRecord("row-1", rec, rm);
+  }
+
+  @Test(expected = BlurException.class)
+  public void testMutationUpdateMissingRowAppendColumns() throws Exception {
+    Column c1 = newColumn("testcol1", "value999");
+    Column c2 = newColumn("testcol2", "value9999");
+    String rec = "record-6";
+    RecordMutation rm = newRecordMutation(APPEND_COLUMN_VALUES, FAMILY, rec, c1, c2);
+
+    updateAndFetchRecord("row-6", rec, rm);
+  }
+
+  private Record updateAndFetchRecord(String rowId, String recordId, RecordMutation... recordMutations) throws Exception {
+    RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, rowId, recordMutations);
+    rowMutation.waitToBeVisible = true;
+    indexManager.mutate(rowMutation);
+
+    Selector selector = new Selector().setRowId(rowId).setRecordId(recordId);
+    selector.setRecordOnly(true);
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    return (fetchResult.recordResult != null ? fetchResult.recordResult.record : null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java b/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
new file mode 100644
index 0000000..0416134
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
@@ -0,0 +1,260 @@
+package org.apache.blur.manager.clusterstatus;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class ZookeeperClusterStatusTest {
+
+  private static final String TEST = "test";
+  private static final String DEFAULT = "default";
+
+  private static final Log LOG = LogFactory.getLog(ZookeeperClusterStatusTest.class);
+  private ZooKeeper zooKeeper;
+  private ZookeeperClusterStatus clusterStatus;
+
+  public static class QuorumPeerMainRun extends QuorumPeerMain {
+    @Override
+    public void initializeAndRun(String[] args) throws ConfigException, IOException {
+      super.initializeAndRun(args);
+    }
+  }
+
+  @BeforeClass
+  public static void setupOnce() throws InterruptedException, IOException, KeeperException {
+    MiniCluster.startZooKeeper("./tmp/zk_test");
+  }
+
+  @AfterClass
+  public static void teardownOnce() {
+    MiniCluster.shutdownZooKeeper();
+  }
+
+  @Before
+  public void setup() throws KeeperException, InterruptedException, IOException {
+    zooKeeper = new ZooKeeperClient(MiniCluster.getZkConnectionString(), 30000, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+
+      }
+    });
+    BlurUtil.setupZookeeper(zooKeeper, DEFAULT);
+    clusterStatus = new ZookeeperClusterStatus(zooKeeper);
+  }
+
+  @After
+  public void teardown() throws InterruptedException {
+    clusterStatus.close();
+    zooKeeper.close();
+  }
+
+  @Test
+  public void testGetClusterList() {
+    LOG.warn("testGetClusterList");
+    List<String> clusterList = clusterStatus.getClusterList(false);
+    assertEquals(Arrays.asList(DEFAULT), clusterList);
+  }
+
+  @Test
+  public void testSafeModeNotSet() throws KeeperException, InterruptedException {
+    LOG.warn("testSafeModeNotSet");
+    assertFalse(clusterStatus.isInSafeMode(false, DEFAULT));
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus.isInSafeMode(true, DEFAULT);
+      }
+    }.test(false);
+  }
+
+  @Test
+  public void testSafeModeSetInPast() throws KeeperException, InterruptedException {
+    LOG.warn("testSafeModeSetInPast");
+    setSafeModeInPast();
+    assertFalse(clusterStatus.isInSafeMode(false, DEFAULT));
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus.isInSafeMode(true, DEFAULT);
+      }
+    }.test(false);
+  }
+
+  @Test
+  public void testSafeModeSetInFuture() throws KeeperException, InterruptedException {
+    LOG.warn("testSafeModeSetInFuture");
+    setSafeModeInFuture();
+    assertTrue(clusterStatus.isInSafeMode(false, DEFAULT));
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus.isInSafeMode(true, DEFAULT);
+      }
+    }.test(true);
+  }
+
+  @Test
+  public void testGetClusterNoTable() {
+    LOG.warn("testGetCluster");
+    assertNull(clusterStatus.getCluster(false, TEST));
+    assertNull(clusterStatus.getCluster(true, TEST));
+  }
+
+  @Test
+  public void testGetClusterTable() throws KeeperException, InterruptedException {
+    LOG.warn("testGetCluster");
+    createTable(TEST);
+    assertEquals(DEFAULT, clusterStatus.getCluster(false, TEST));
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus.getCluster(true, TEST);
+      }
+    }.test(DEFAULT);
+  }
+
+  @Test
+  public void testGetTableList() {
+    assertEquals(Arrays.asList(TEST), clusterStatus.getTableList(false));
+  }
+
+  @Test
+  public void testIsEnabledNoTable() {
+    assertFalse(clusterStatus.isEnabled(false, DEFAULT, "notable"));
+    assertFalse(clusterStatus.isEnabled(true, DEFAULT, "notable"));
+  }
+
+  @Test
+  public void testIsEnabledDisabledTable() throws KeeperException, InterruptedException {
+    createTable("disabledtable", false);
+    assertFalse(clusterStatus.isEnabled(false, DEFAULT, "disabledtable"));
+    assertFalse(clusterStatus.isEnabled(true, DEFAULT, "disabledtable"));
+  }
+
+  @Test
+  public void testIsEnabledEnabledTable() throws KeeperException, InterruptedException {
+    createTable("enabledtable", true);
+    assertTrue(clusterStatus.isEnabled(false, DEFAULT, "enabledtable"));
+  
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus.isEnabled(true, DEFAULT, "enabledtable");
+      }
+    }.test(true);
+  }
+  
+  private void createTable(String name) throws KeeperException, InterruptedException {
+    createTable(name, true);
+  }
+
+  private void createTable(String name, boolean enabled) throws KeeperException, InterruptedException {
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setName(name);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri("./tmp/zk_test_hdfs");
+    tableDescriptor.setIsEnabled(enabled);
+    clusterStatus.createTable(tableDescriptor);
+    if (enabled) {
+      clusterStatus.enableTable(tableDescriptor.getCluster(), name);
+    }
+  }
+
+  public abstract class WaitForAnswerToBeCorrect {
+
+    private long totalWaitTimeNanos;
+
+    public WaitForAnswerToBeCorrect(long totalWaitTimeMs) {
+      this.totalWaitTimeNanos = TimeUnit.MILLISECONDS.toNanos(totalWaitTimeMs);
+    }
+
+    public abstract Object run();
+
+    public void test(Object o) {
+      long start = System.nanoTime();
+      while (true) {
+        Object object = run();
+        if (object.equals(o) || object == o) {
+          return;
+        }
+        long now = System.nanoTime();
+        if (now - start > totalWaitTimeNanos) {
+          fail();
+        }
+        try {
+          Thread.sleep(1);
+        } catch (InterruptedException e) {
+          fail(e.getMessage());
+        }
+      }
+    }
+  }
+
+  private void setSafeModeInPast() throws KeeperException, InterruptedException {
+    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(DEFAULT);
+    Stat stat = zooKeeper.exists(blurSafemodePath, false);
+    byte[] data = Long.toString(System.currentTimeMillis() - 60000).getBytes();
+    if (stat == null) {
+      zooKeeper.create(blurSafemodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+    zooKeeper.setData(blurSafemodePath, data, -1);
+  }
+
+  private void setSafeModeInFuture() throws KeeperException, InterruptedException {
+    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(DEFAULT);
+    Stat stat = zooKeeper.exists(blurSafemodePath, false);
+    byte[] data = Long.toString(System.currentTimeMillis() + 60000).getBytes();
+    if (stat == null) {
+      zooKeeper.create(blurSafemodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+    zooKeeper.setData(blurSafemodePath, data, -1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/test/java/org/apache/blur/manager/indexserver/DistributedLayoutManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/indexserver/DistributedLayoutManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/indexserver/DistributedLayoutManagerTest.java
new file mode 100644
index 0000000..f3136ea
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/manager/indexserver/DistributedLayoutManagerTest.java
@@ -0,0 +1,116 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.blur.manager.indexserver.DistributedLayoutManager;
+import org.junit.Test;
+
+
+public class DistributedLayoutManagerTest {
+
+  @Test
+  public void testLayoutManager() {
+    TreeSet<String> nodes = new TreeSet<String>();
+    nodes.add("n1");
+    nodes.add("n2");
+    nodes.add("n3");
+
+    TreeSet<String> nodesOffline = new TreeSet<String>();
+    nodesOffline.add("n2");
+
+    TreeSet<String> shards = new TreeSet<String>();
+    shards.add("s1");
+    shards.add("s2");
+    shards.add("s3");
+    shards.add("s4");
+    shards.add("s5");
+
+    DistributedLayoutManager layoutManager1 = new DistributedLayoutManager();
+    layoutManager1.setNodes(nodes);
+    layoutManager1.setShards(shards);
+    layoutManager1.init();
+    Map<String, String> layout1 = layoutManager1.getLayout();
+
+    DistributedLayoutManager layoutManager2 = new DistributedLayoutManager();
+    layoutManager2.setNodes(nodes);
+    layoutManager2.setShards(shards);
+    layoutManager2.setNodesOffline(nodesOffline);
+    layoutManager2.init();
+    Map<String, String> layout2 = layoutManager2.getLayout();
+
+    assertEquals(shards, new TreeSet<String>(layout1.keySet()));
+    assertEquals(nodes, new TreeSet<String>(layout1.values()));
+
+    assertEquals(shards, new TreeSet<String>(layout2.keySet()));
+    TreeSet<String> nodesOnline = new TreeSet<String>(nodes);
+    nodesOnline.removeAll(nodesOffline);
+    assertEquals(nodesOnline, new TreeSet<String>(layout2.values()));
+
+  }
+
+  @Test
+  public void testLayoutManagerPerformance() {
+    DistributedLayoutManager perfTest = new DistributedLayoutManager();
+    perfTest.setNodes(getTestNodes());
+    perfTest.setShards(getTestShards());
+    perfTest.setNodesOffline(getTestOfflineNodes());
+    perfTest.init();
+    int testSize = 100000;
+    for (int i = 0; i < testSize; i++) {
+      perfTest.getLayout();
+    }
+    long s = System.nanoTime();
+    for (int i = 0; i < testSize; i++) {
+      perfTest.getLayout();
+    }
+    long e = System.nanoTime();
+    double ms = (e - s) / 1000000.0;
+    System.out.println("Total    " + ms);
+    System.out.println("Per Call " + ms / testSize);
+    assertTrue(ms < 100);
+  }
+
+  private static Collection<String> getTestOfflineNodes() {
+    return Arrays.asList("n13");
+  }
+
+  private static Collection<String> getTestShards() {
+    Collection<String> shards = new HashSet<String>();
+    for (int i = 0; i < 701; i++) {
+      shards.add("s" + i);
+    }
+    return shards;
+  }
+
+  private static Collection<String> getTestNodes() {
+    Collection<String> nodes = new HashSet<String>();
+    for (int i = 0; i < 32; i++) {
+      nodes.add("n" + i);
+    }
+    return nodes;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java b/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java
new file mode 100644
index 0000000..c568cd1
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java
@@ -0,0 +1,155 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.MiniCluster;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class SafeModeTest {
+
+  private static String path = "./target/test-zk";
+  private static ZooKeeper zk;
+
+  @BeforeClass
+  public static void startZooKeeper() throws IOException {
+    new File(path).mkdirs();
+    MiniCluster.startZooKeeper(path);
+    zk = new ZooKeeper(MiniCluster.getZkConnectionString(), 20000, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+
+      }
+    });
+  }
+
+  @AfterClass
+  public static void stopZooKeeper() throws InterruptedException {
+    zk.close();
+    MiniCluster.shutdownZooKeeper();
+  }
+
+  @Test
+  public void testBasicStartup() throws IOException, InterruptedException {
+    List<AtomicReference<Throwable>> errors = new ArrayList<AtomicReference<Throwable>>();
+    List<AtomicLong> timeRegisteredLst = new ArrayList<AtomicLong>();
+    List<Thread> threads = new ArrayList<Thread>();
+    for (int i = 0; i < 32; i++) {
+      AtomicReference<Throwable> ref = new AtomicReference<Throwable>();
+      AtomicLong timeRegistered = new AtomicLong();
+      errors.add(ref);
+      timeRegisteredLst.add(timeRegistered);
+      threads.add(startThread(zk, "node" + i, ref, timeRegistered));
+      Thread.sleep(100);
+    }
+
+    for (Thread t : threads) {
+      t.join();
+    }
+
+    for (AtomicReference<Throwable> t : errors) {
+      assertNull(t.get());
+    }
+
+    long oldest = -1;
+    long newest = -1;
+    for (AtomicLong time : timeRegisteredLst) {
+      long l = time.get();
+      if (oldest == -1 || l < oldest) {
+        oldest = l;
+      }
+      if (newest == -1 || l > newest) {
+        newest = l;
+      }
+    }
+    assertTrue((newest - oldest) < TimeUnit.SECONDS.toMillis(1));
+  }
+
+  @Test
+  public void testExtraNodeStartup() throws IOException, InterruptedException, KeeperException {
+    ZooKeeper zk = new ZooKeeper(MiniCluster.getZkConnectionString(), 20000, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+
+      }
+    });
+
+    SafeMode safeMode = new SafeMode(zk, "/testing/safemode", "/testing/nodepath", TimeUnit.SECONDS, 5,
+        TimeUnit.SECONDS, 60);
+    long s = System.nanoTime();
+    safeMode.registerNode("node101", null);
+    long e = System.nanoTime();
+
+    assertTrue((e - s) < TimeUnit.SECONDS.toNanos(1));
+    zk.close();
+  }
+
+  @Test
+  public void testSecondNodeStartup() throws IOException, InterruptedException, KeeperException {
+    ZooKeeper zk = new ZooKeeper(MiniCluster.getZkConnectionString(), 20000, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+
+      }
+    });
+
+    SafeMode safeMode = new SafeMode(zk, "/testing/safemode", "/testing/nodepath", TimeUnit.SECONDS, 5,
+        TimeUnit.SECONDS, 15);
+    try {
+      safeMode.registerNode("node10", null);
+      fail("should throw exception.");
+    } catch (Exception e) {
+    }
+    zk.close();
+  }
+
+  private Thread startThread(final ZooKeeper zk, final String node, final AtomicReference<Throwable> errorRef,
+      final AtomicLong timeRegistered) {
+    Runnable runnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          SafeMode safeMode = new SafeMode(zk, "/testing/safemode", "/testing/nodepath", TimeUnit.SECONDS, 5,
+              TimeUnit.SECONDS, 60);
+          safeMode.registerNode(node, null);
+          timeRegistered.set(System.currentTimeMillis());
+        } catch (Throwable t) {
+          errorRef.set(t);
+        }
+      }
+    };
+    Thread thread = new Thread(runnable);
+    thread.start();
+    return thread;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/test/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparatorTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparatorTest.java b/blur-core/src/test/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparatorTest.java
new file mode 100644
index 0000000..02107fd
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparatorTest.java
@@ -0,0 +1,55 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.blur.manager.results.PeekableIterator;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.utils.BlurConstants;
+import org.junit.Test;
+
+
+public class BlurResultPeekableIteratorComparatorTest {
+
+  @Test
+  public void testResultPeekableIteratorComparator() {
+    List<PeekableIterator<BlurResult>> results = new ArrayList<PeekableIterator<BlurResult>>();
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>(Arrays.asList(newResult("5", 5))).iterator()));
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>().iterator()));
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>().iterator()));
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>(Arrays.asList(newResult("2", 2))).iterator()));
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>(Arrays.asList(newResult("1", 1))).iterator()));
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>(Arrays.asList(newResult("9", 1))).iterator()));
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>().iterator()));
+
+    Collections.sort(results, BlurConstants.HITS_PEEKABLE_ITERATOR_COMPARATOR);
+
+    for (PeekableIterator<BlurResult> iterator : results) {
+      System.out.println(iterator.peek());
+    }
+  }
+
+  private BlurResult newResult(String id, double score) {
+    return new BlurResult(id, score, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/test/java/org/apache/blur/manager/results/MultipleBlurResultIterableTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/results/MultipleBlurResultIterableTest.java b/blur-core/src/test/java/org/apache/blur/manager/results/MultipleBlurResultIterableTest.java
new file mode 100644
index 0000000..7207eea
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/manager/results/MultipleBlurResultIterableTest.java
@@ -0,0 +1,55 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.manager.results.BlurResultIterableMultiple;
+import org.apache.blur.manager.results.BlurResultIterableSimple;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.junit.Test;
+
+
+public class MultipleBlurResultIterableTest {
+
+  @Test
+  public void testMultipleHitsIterable() {
+    BlurResultIterableMultiple iterable = new BlurResultIterableMultiple();
+    iterable.addBlurResultIterable(newBlurResultIterable(0, 0.1, 3, 2, 9, 10, 2));
+    iterable.addBlurResultIterable(newBlurResultIterable(7, 2, 9, 1, 34, 53, 12));
+    iterable.addBlurResultIterable(newBlurResultIterable(4, 3));
+    iterable.addBlurResultIterable(newBlurResultIterable(7, 2, 34, 132));
+    iterable.addBlurResultIterable(newBlurResultIterable());
+
+    for (BlurResult hit : iterable) {
+      System.out.println(hit);
+    }
+  }
+
+  private BlurResultIterable newBlurResultIterable(double... ds) {
+    List<BlurResult> results = new ArrayList<BlurResult>();
+    for (double d : ds) {
+      results.add(new BlurResult(UUID.randomUUID().toString() + "-" + Double.toString(d), d, null));
+    }
+    return new BlurResultIterableSimple(UUID.randomUUID().toString(), results);
+  }
+
+}


Mime
View raw message