incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [17/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
new file mode 100644
index 0000000..58d2d3d
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -0,0 +1,890 @@
+package org.apache.blur.manager;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.apache.blur.thrift.generated.RecordMutationType.APPEND_COLUMN_VALUES;
+import static org.apache.blur.thrift.generated.RecordMutationType.DELETE_ENTIRE_RECORD;
+import static org.apache.blur.thrift.generated.RecordMutationType.REPLACE_COLUMNS;
+import static org.apache.blur.thrift.generated.RecordMutationType.REPLACE_ENTIRE_RECORD;
+import static org.apache.blur.thrift.generated.RowMutationType.DELETE_ROW;
+import static org.apache.blur.thrift.generated.RowMutationType.UPDATE_ROW;
+import static org.apache.blur.utils.BlurUtil.match;
+import static org.apache.blur.utils.BlurUtil.newColumn;
+import static org.apache.blur.utils.BlurUtil.newRecord;
+import static org.apache.blur.utils.BlurUtil.newRecordMutation;
+import static org.apache.blur.utils.BlurUtil.newRow;
+import static org.apache.blur.utils.BlurUtil.newRowMutation;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.manager.indexserver.LocalIndexServer;
+import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.metrics.BlurMetrics;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Facet;
+import org.apache.blur.thrift.generated.FetchRecordResult;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.Schema;
+import org.apache.blur.thrift.generated.ScoreType;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.SimpleQuery;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class IndexManagerTest {
+
+  private static final String SHARD_NAME = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, 0);
+  private static final String TABLE = "table";
+  private static final String FAMILY = "test-family";
+  private static final String FAMILY2 = "test-family2";
+  private LocalIndexServer server;
+  private IndexManager indexManager;
+
+  @Before
+  public void setUp() throws BlurException, IOException, InterruptedException {
+    File file = new File("./tmp/indexer-manager-test");
+    rm(file);
+    new File(new File(file, TABLE), SHARD_NAME).mkdirs();
+    server = new LocalIndexServer(file, new Path("./tmp/indexer-manager-test"));
+
+    indexManager = new IndexManager();
+    indexManager.setStatusCleanupTimerDelay(1000);
+    indexManager.setIndexServer(server);
+    indexManager.setThreadCount(1);
+    indexManager.setBlurMetrics(new BlurMetrics(new Configuration()));
+    indexManager.init();
+    setupData();
+  }
+
+  @After
+  public void teardown() {
+    indexManager.close();
+    indexManager = null;
+    server = null;
+  }
+
+  private void rm(File file) {
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  private void setupData() throws BlurException, IOException {
+    RowMutation mutation1 = newRowMutation(TABLE, "row-1",
+        newRecordMutation(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3")));
+    RowMutation mutation2 = newRowMutation(TABLE, "row-2",
+        newRecordMutation(FAMILY, "record-2", newColumn("testcol1", "value4"), newColumn("testcol2", "value5"), newColumn("testcol3", "value6")),
+        newRecordMutation(FAMILY, "record-2B", newColumn("testcol2", "value234123"), newColumn("testcol3", "value234123")));
+    RowMutation mutation3 = newRowMutation(TABLE, "row-3",
+        newRecordMutation(FAMILY, "record-3", newColumn("testcol1", "value7"), newColumn("testcol2", "value8"), newColumn("testcol3", "value9")));
+    RowMutation mutation4 = newRowMutation(TABLE, "row-4",
+        newRecordMutation(FAMILY, "record-4", newColumn("testcol1", "value1"), newColumn("testcol2", "value5"), newColumn("testcol3", "value9")),
+        newRecordMutation(FAMILY, "record-4B", newColumn("testcol2", "value234123"), newColumn("testcol3", "value234123")));
+    RowMutation mutation5 = newRowMutation(
+        TABLE,
+        "row-5",
+        newRecordMutation(FAMILY, "record-5A", newColumn("testcol1", "value13"), newColumn("testcol2", "value14"), newColumn("testcol3", "value15")),
+        newRecordMutation(FAMILY, "record-5B", newColumn("testcol1", "value16"), newColumn("testcol2", "value17"), newColumn("testcol3", "value18"),
+            newColumn("testcol3", "value19")));
+    RowMutation mutation6 = newRowMutation(TABLE, "row-6", newRecordMutation(FAMILY, "record-6A", newColumn("testcol12", "value110"), newColumn("testcol13", "value102")),
+        newRecordMutation(FAMILY, "record-6B", newColumn("testcol12", "value101"), newColumn("testcol13", "value104")),
+        newRecordMutation(FAMILY2, "record-6C", newColumn("testcol18", "value501")));
+    RowMutation mutation7 = newRowMutation(TABLE, "row-7", newRecordMutation(FAMILY, "record-7A", newColumn("testcol12", "value101"), newColumn("testcol13", "value102")),
+        newRecordMutation(FAMILY2, "record-7B", newColumn("testcol18", "value501")));
+    mutation7.waitToBeVisible = true;
+    indexManager.mutate(mutation1);
+    indexManager.mutate(mutation2);
+    indexManager.mutate(mutation3);
+    indexManager.mutate(mutation4);
+    indexManager.mutate(mutation5);
+    indexManager.mutate(mutation6);
+    indexManager.mutate(mutation7);
+  }
+
+  @Test
+  public void testQueryWithJoinAll() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "+test-family.testcol12:value101 +test-family.testcol13:value102 +test-family2.testcol18:value501";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 2);
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId());
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNotNull(fetchResult.rowResult);
+      assertNull(fetchResult.recordResult);
+    }
+  }
+
+  @Test
+  public void testQueryWithJoin() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "+(+test-family.testcol12:value101 +test-family.testcol13:value102) +test-family2.testcol18:value501";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 1);
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId());
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNotNull(fetchResult.rowResult);
+      assertNull(fetchResult.recordResult);
+    }
+  }
+
+  @Test
+  public void testQueryWithJoinForcingSuperQuery() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "+(+test-family.testcol1:value1 nojoin) +(+test-family.testcol3:value234123)";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 1);
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId());
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNotNull(fetchResult.rowResult);
+      assertNull(fetchResult.recordResult);
+    }
+  }
+
+  @Test
+  public void testQueryWithFacetsWithWildCard() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+    blurQuery.facets = Arrays.asList(new Facet("test-family.testcol1:value*", Long.MAX_VALUE), new Facet("test-family.testcol1:value-nohit", Long.MAX_VALUE));
+
+    AtomicLongArray facetedCounts = new AtomicLongArray(2);
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, facetedCounts);
+    assertEquals(iterable.getTotalResults(), 2);
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId());
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNotNull(fetchResult.rowResult);
+      assertNull(fetchResult.recordResult);
+    }
+
+    assertEquals(2, facetedCounts.get(0));
+    assertEquals(0, facetedCounts.get(1));
+
+    assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+    Thread.sleep(2000);// wait for cleanup to fire
+    assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+  }
+
+  @Test
+  public void testFetchRowByLocationId() throws Exception {
+    Selector selector = new Selector().setLocationId(SHARD_NAME + "/0");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNotNull(fetchResult.rowResult.row);
+    Row row = newRow("row-1", newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3")));
+    row.recordCount = 1;
+    assertEquals(row, fetchResult.rowResult.row);
+  }
+
+  @Test
+  public void testFetchMissingRowByLocationId() throws Exception {
+    try {
+      Selector selector = new Selector().setLocationId("shard4/0");
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      fail("Should throw exception");
+    } catch (BlurException e) {
+    }
+  }
+
+  @Test
+  public void testFetchRecordByLocationId() throws Exception {
+    Selector selector = new Selector().setLocationId(SHARD_NAME + "/0").setRecordOnly(true);
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNull(fetchResult.rowResult);
+    assertNotNull(fetchResult.recordResult.record);
+
+    assertEquals("row-1", fetchResult.recordResult.rowid);
+    assertEquals("record-1", fetchResult.recordResult.record.recordId);
+    assertEquals(FAMILY, fetchResult.recordResult.record.family);
+
+    Record record = newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3"));
+    assertEquals(record, fetchResult.recordResult.record);
+  }
+
+  @Test
+  public void testFetchRowByRowId() throws Exception {
+    Selector selector = new Selector().setRowId("row-1");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNotNull(fetchResult.rowResult.row);
+    Row row = newRow("row-1", newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3")));
+    row.recordCount = 1;
+    assertEquals(row, fetchResult.rowResult.row);
+  }
+
+  @Test
+  public void testFetchRowByRecordIdOnly() throws Exception {
+    Selector selector = new Selector().setRecordId("record-1");
+    FetchResult fetchResult = new FetchResult();
+    try {
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      fail("Invalid selector should throw exception.");
+    } catch (BlurException e) {
+      // do nothing, this is a pass
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testFetchRowByRecordIdOnlyNoRecordOnly() throws Exception {
+    Selector selector = new Selector().setRowId("row-1").setRecordId("record-1");
+    FetchResult fetchResult = new FetchResult();
+    try {
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      fail("Invalid selector should throw exception.");
+    } catch (BlurException e) {
+      // do nothing, this is a pass
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testFetchRowByRecordId() throws Exception {
+    Selector selector = new Selector().setRowId("row-1").setRecordId("record-1").setRecordOnly(true);
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertFalse(fetchResult.deleted);
+    assertTrue(fetchResult.exists);
+    assertEquals(TABLE, fetchResult.table);
+    assertNull(fetchResult.rowResult);
+    assertNotNull(fetchResult.recordResult);
+    FetchRecordResult recordResult = fetchResult.recordResult;
+    assertEquals(FAMILY, recordResult.record.family);
+    assertEquals("record-1", recordResult.record.recordId);
+    assertEquals("row-1", recordResult.rowid);
+
+    Record record = newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3"));
+    assertEquals(record, recordResult.record);
+
+  }
+
+  @Test
+  public void testRecordFrequency() throws Exception {
+    assertEquals(2, indexManager.recordFrequency(TABLE, FAMILY, "testcol1", "value1"));
+    assertEquals(0, indexManager.recordFrequency(TABLE, FAMILY, "testcol1", "NO VALUE"));
+  }
+
+  @Test
+  public void testSchema() throws Exception {
+    Schema schema = indexManager.schema(TABLE);
+    assertEquals(TABLE, schema.table);
+    Map<String, Set<String>> columnFamilies = schema.columnFamilies;
+    assertEquals(new TreeSet<String>(Arrays.asList(FAMILY, FAMILY2)), new TreeSet<String>(columnFamilies.keySet()));
+    assertEquals(new TreeSet<String>(Arrays.asList("testcol1", "testcol2", "testcol3", "testcol12", "testcol13")), new TreeSet<String>(columnFamilies.get(FAMILY)));
+    assertEquals(new TreeSet<String>(Arrays.asList("testcol18")), new TreeSet<String>(columnFamilies.get(FAMILY2)));
+  }
+
+  @Test
+  public void testQuerySuperQueryTrue() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(2, iterable.getTotalResults());
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId());
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNotNull(fetchResult.rowResult);
+      assertNull(fetchResult.recordResult);
+    }
+
+    assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+    Thread.sleep(2000);// wait for cleanup to fire
+    assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+  }
+
+  @Test
+  public void testQuerySuperQueryTrueWithSelector() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+    blurQuery.selector = new Selector();
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 2);
+    for (BlurResult result : iterable) {
+      assertNotNull(result.fetchResult.rowResult);
+      assertNull(result.fetchResult.recordResult);
+    }
+
+    assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+    Thread.sleep(2000);// wait for cleanup to fire
+    assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+  }
+
+  @Test
+  public void testQuerySuperQueryFalse() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.simpleQuery.superQueryOn = false;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 2);
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId()).setRecordOnly(true);
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNull(fetchResult.rowResult);
+      assertNotNull(fetchResult.recordResult);
+    }
+
+    assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+    Thread.sleep(2000);// wait for cleanup to fire
+    assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+  }
+
+  @Test
+  public void testQuerySuperQueryFalseWithSelector() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.simpleQuery.superQueryOn = false;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+    blurQuery.selector = new Selector();
+    blurQuery.selector.setRecordOnly(true);
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 2);
+    for (BlurResult result : iterable) {
+      assertNull(result.fetchResult.rowResult);
+      assertNotNull(result.fetchResult.recordResult);
+    }
+
+    assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+    Thread.sleep(2000);// wait for cleanup to fire
+    assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+  }
+
+  @Test
+  public void testQueryRecordOnly() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.selector = new Selector();
+    blurQuery.selector.setRecordOnly(true);
+
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+    assertEquals(iterable.getTotalResults(), 2);
+
+    int matchRecord1 = 0;
+    int matchRecord4 = 0;
+
+    for (BlurResult result : iterable) {
+      assertNull(result.fetchResult.rowResult);
+      assertNotNull(result.fetchResult.recordResult);
+
+      Record r = result.fetchResult.recordResult.record;
+
+      if (r.getRecordId().equals("record-1")) {
+        matchRecord1 += 1;
+      } else if (r.getRecordId().equals("record-4")) {
+        matchRecord4 += 1;
+      } else {
+        fail("Unexpected record ID [" + r.getRecordId() + "]");
+      }
+    }
+
+    assertEquals("Unexpected number of record-1 results", 1, matchRecord1);
+    assertEquals("Unexpected number of record-4 results", 1, matchRecord4);
+  }
+
+  @Test
+  public void testQueryWithFacets() throws Exception {
+    BlurQuery blurQuery = new BlurQuery();
+    blurQuery.simpleQuery = new SimpleQuery();
+    blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+    blurQuery.simpleQuery.superQueryOn = true;
+    blurQuery.simpleQuery.type = ScoreType.SUPER;
+    blurQuery.fetch = 10;
+    blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+    blurQuery.maxQueryTime = Long.MAX_VALUE;
+    blurQuery.uuid = 1;
+    blurQuery.facets = Arrays.asList(new Facet("test-family.testcol1:value1", Long.MAX_VALUE), new Facet("test-family.testcol1:value-nohit", Long.MAX_VALUE));
+
+    AtomicLongArray facetedCounts = new AtomicLongArray(2);
+    BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, facetedCounts);
+    assertEquals(iterable.getTotalResults(), 2);
+    for (BlurResult result : iterable) {
+      Selector selector = new Selector().setLocationId(result.getLocationId());
+      FetchResult fetchResult = new FetchResult();
+      indexManager.fetchRow(TABLE, selector, fetchResult);
+      assertNotNull(fetchResult.rowResult);
+      assertNull(fetchResult.recordResult);
+    }
+
+    assertEquals(2, facetedCounts.get(0));
+    assertEquals(0, facetedCounts.get(1));
+
+    assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+    Thread.sleep(2000);// wait for cleanup to fire
+    assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+  }
+
+  @Test
+  public void testTerms() throws Exception {
+    List<String> terms = indexManager.terms(TABLE, FAMILY, "testcol1", "", (short) 100);
+    assertEquals(Arrays.asList("value1", "value13", "value16", "value4", "value7"), terms);
+  }
+
+  @Test
+  public void testMutationReplaceRow() throws Exception {
+    RowMutation mutation = newRowMutation(TABLE, "row-4",
+        newRecordMutation(FAMILY, "record-4", newColumn("testcol1", "value2"), newColumn("testcol2", "value3"), newColumn("testcol3", "value4")));
+    mutation.waitToBeVisible = true;
+    indexManager.mutate(mutation);
+
+    Selector selector = new Selector().setRowId("row-4");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNotNull(fetchResult.rowResult.row);
+    Row row = newRow("row-4", newRecord(FAMILY, "record-4", newColumn("testcol1", "value2"), newColumn("testcol2", "value3"), newColumn("testcol3", "value4")));
+    row.recordCount = 1;
+    assertEquals(row, fetchResult.rowResult.row);
+  }
+
+  @Test
+  public void testMutationReplaceMissingRow() throws Exception {
+    Column c1 = newColumn("testcol1", "value20");
+    Column c2 = newColumn("testcol2", "value21");
+    Column c3 = newColumn("testcol3", "value22");
+    String rec = "record-6";
+    RecordMutation rm = newRecordMutation(FAMILY, rec, c1, c2, c3);
+    RowMutation mutation = newRowMutation(TABLE, "row-6", rm);
+    mutation.waitToBeVisible = true;
+    indexManager.mutate(mutation);
+
+    Selector selector = new Selector().setRowId("row-6");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    Row r = fetchResult.rowResult.row;
+    assertNotNull("new row should exist", r);
+    Row row = newRow("row-6", newRecord(FAMILY, "record-6", newColumn("testcol1", "value20"), newColumn("testcol2", "value21"), newColumn("testcol3", "value22")));
+    row.recordCount = 1;
+    assertEquals("row should match", row, r);
+  }
+
+  @Test
+  public void testMutationDeleteRow() throws Exception {
+    RowMutation mutation = newRowMutation(DELETE_ROW, TABLE, "row-2");
+    mutation.waitToBeVisible = true;
+    indexManager.mutate(mutation);
+
+    Selector selector = new Selector().setRowId("row-2");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNull("row should be deleted", fetchResult.rowResult);
+  }
+
+  @Test
+  public void testMutationDeleteMissingRow() throws Exception {
+    RowMutation mutation = newRowMutation(DELETE_ROW, TABLE, "row-6");
+    mutation.waitToBeVisible = true;
+    indexManager.mutate(mutation);
+
+    Selector selector = new Selector().setRowId("row-6");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNull("row should not exist", fetchResult.rowResult);
+  }
+
+  @Test
+  public void testMutationUpdateRowDeleteLastRecord() throws Exception {
+    RecordMutation rm = newRecordMutation(DELETE_ENTIRE_RECORD, FAMILY, "record-3");
+
+    RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, "row-3", rm);
+
+    rowMutation.waitToBeVisible = true;
+    indexManager.mutate(rowMutation);
+
+    Selector selector = new Selector().setRowId("row-3");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNull("row should not exist", fetchResult.rowResult);
+  }
+
+  @Test
+  public void testMutationUpdateRowDeleteRecord() throws Exception {
+    RecordMutation rm = newRecordMutation(DELETE_ENTIRE_RECORD, FAMILY, "record-5A");
+
+    RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, "row-5", rm);
+    rowMutation.waitToBeVisible = true;
+    indexManager.mutate(rowMutation);
+
+    Selector selector = new Selector().setRowId("row-5");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNotNull("row should exist", fetchResult.rowResult);
+    assertNotNull("row should exist", fetchResult.rowResult.row);
+    assertEquals("row should have one record", 1, fetchResult.rowResult.row.getRecordsSize());
+  }
+
+  @Test(expected = BlurException.class)
+  public void testMutationUpdateMissingRowDeleteRecord() throws Exception {
+    RecordMutation rm = newRecordMutation(DELETE_ENTIRE_RECORD, FAMILY, "record-101");
+
+    RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, "row-101", rm);
+    rowMutation.waitToBeVisible = true;
+    indexManager.mutate(rowMutation);
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceExistingRecord() throws Exception {
+    Column c1 = newColumn("testcol4", "value104");
+    Column c2 = newColumn("testcol5", "value105");
+    Column c3 = newColumn("testcol6", "value105");
+    String rec = "record-5A";
+    RecordMutation rm = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, rec, c1, c2, c3);
+
+    Record r = updateAndFetchRecord("row-5", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 3 columns in record", 3, r.getColumnsSize());
+    assertTrue("column 1 should be in record", r.columns.contains(c1));
+    assertTrue("column 2 should be in record", r.columns.contains(c2));
+    assertTrue("column 3 should be in record", r.columns.contains(c3));
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceMissingRecord() throws Exception {
+    Column c1 = newColumn("testcol4", "value104");
+    Column c2 = newColumn("testcol5", "value105");
+    Column c3 = newColumn("testcol6", "value105");
+    String rec = "record-5C";
+    RecordMutation rm = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, rec, c1, c2, c3);
+
+    Record r = updateAndFetchRecord("row-5", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 3 columns in record", 3, r.getColumnsSize());
+    assertTrue("column 1 should be in record", r.columns.contains(c1));
+    assertTrue("column 2 should be in record", r.columns.contains(c2));
+    assertTrue("column 3 should be in record", r.columns.contains(c3));
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceMixedRecords() throws Exception {
+    Column c1 = newColumn("testcol4", "value104");
+    Column c2 = newColumn("testcol5", "value105");
+    Column c3 = newColumn("testcol6", "value105");
+    RecordMutation rm1 = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, "record-5A", c1, c2, c3);
+    Column c4 = newColumn("testcol4", "value104");
+    Column c5 = newColumn("testcol5", "value105");
+    Column c6 = newColumn("testcol6", "value105");
+    RecordMutation rm2 = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, "record-5C", c4, c5, c6);
+
+    RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, "row-5", rm1, rm2);
+    rowMutation.waitToBeVisible = true;
+    indexManager.mutate(rowMutation);
+
+    Selector selector = new Selector().setRowId("row-5");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    Row r = fetchResult.rowResult.row;
+    assertNotNull("row should exist", r);
+    assertEquals("only 3 records in row", 3, r.getRecordsSize());
+    int rm1Matches = 0;
+    int rm2Matches = 0;
+    int nonMatches = 0;
+    for (Record record : r.records) {
+      if (match(rm1, record)) {
+        rm1Matches += 1;
+      } else if (match(rm2, record)) {
+        rm2Matches += 1;
+      } else {
+        nonMatches += 1;
+      }
+    }
+    assertEquals("matching record should be updated", 1, rm1Matches);
+    assertEquals("missing record should be added", 1, rm2Matches);
+    assertEquals("unmodified record should exist", 1, nonMatches);
+  }
+
+  @Test(expected = BlurException.class)
+  public void testMutationUpdateMissingRowReplaceRecord() throws Exception {
+    Column c1 = newColumn("testcol1", "value104");
+    Column c2 = newColumn("testcol2", "value105");
+    Column c3 = newColumn("testcol3", "value105");
+    String rec = "record-100";
+    RecordMutation rm = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, rec, c1, c2, c3);
+
+    updateAndFetchRecord("row-100", rec, rm);
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceExistingColumns() throws Exception {
+    Column c1 = newColumn("testcol1", "value999");
+    Column c2 = newColumn("testcol2", "value9999");
+    String rec = "record-1";
+    RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+    Record r = updateAndFetchRecord("row-1", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 3 columns in record", 3, r.getColumnsSize());
+    assertTrue("column 1 should be in record", r.columns.contains(c1));
+    assertTrue("column 2 should be in record", r.columns.contains(c2));
+    boolean foundUnmodifiedColumn = false;
+    for (Column column : r.columns) {
+      if (column.name.equals("testcol3") && column.value.equals("value3")) {
+        foundUnmodifiedColumn = true;
+        break;
+      }
+    }
+    assertTrue("column 3 should be unmodified", foundUnmodifiedColumn);
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceExistingDuplicateColumns() throws Exception {
+    Column c = newColumn("testcol3", "value999");
+    String rec = "record-5B";
+    RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c);
+
+    Record r = updateAndFetchRecord("row-5", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 3 columns in record", 3, r.getColumnsSize());
+    assertTrue("new column should be in record", r.columns.contains(c));
+    boolean foundDuplicateColumn = false;
+    for (Column column : r.columns) {
+      if (column.name.equals(c.name) && !column.value.equals(c.value)) {
+        foundDuplicateColumn = true;
+        break;
+      }
+    }
+    assertFalse("duplicate columns should be removed", foundDuplicateColumn);
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceMissingColumns() throws Exception {
+    Column c1 = newColumn("testcol4", "value999");
+    Column c2 = newColumn("testcol5", "value9999");
+    String rec = "record-1";
+    RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+    Record r = updateAndFetchRecord("row-1", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 5 columns in record", 5, r.getColumnsSize());
+    assertTrue("column 1 should be in record", r.columns.contains(c1));
+    assertTrue("column 2 should be in record", r.columns.contains(c2));
+  }
+
+  @Test
+  public void testMutationUpdateRowReplaceMixedColumns() throws Exception {
+    Column c1 = newColumn("testcol1", "value999");
+    Column c2 = newColumn("testcol4", "value9999");
+    String rec = "record-1";
+    RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+    Record r = updateAndFetchRecord("row-1", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 4 columns in record", 4, r.getColumnsSize());
+    assertTrue("column 1 should be in record", r.columns.contains(c1));
+    assertTrue("column 2 should be in record", r.columns.contains(c2));
+  }
+
+  @Test(expected = BlurException.class)
+  public void testMutationUpdateRowMissingRecordReplaceColumns() throws Exception {
+    Column c1 = newColumn("testcol4", "value999");
+    Column c2 = newColumn("testcol5", "value9999");
+    String rec = "record-1B";
+    RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+    updateAndFetchRecord("row-1", rec, rm);
+  }
+
+  @Test(expected = BlurException.class)
+  public void testMutationUpdateMissingRowReplaceColumns() throws Exception {
+    Column c1 = newColumn("testcol1", "value999");
+    Column c2 = newColumn("testcol2", "value9999");
+    String rec = "record-6";
+    RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+    updateAndFetchRecord("row-6", rec, rm);
+  }
+
+  @Test
+  public void testMutationUpdateRowAppendColumns() throws Exception {
+    Column c1 = newColumn("testcol1", "value999");
+    Column c2 = newColumn("testcol2", "value9999");
+    Column c3 = newColumn("testcol4", "hmm");
+    String rec = "record-1";
+    RecordMutation rm = newRecordMutation(APPEND_COLUMN_VALUES, FAMILY, rec, c1, c2, c3);
+
+    Record r = updateAndFetchRecord("row-1", rec, rm);
+
+    assertNotNull("record should exist", r);
+    assertEquals("only 6 columns in record", 6, r.getColumnsSize());
+    assertTrue("column 1 should be in record", r.columns.contains(c1));
+    assertTrue("column 2 should be in record", r.columns.contains(c2));
+    assertTrue("column 3 should be in record", r.columns.contains(c3));
+    int numTestcol1 = 0;
+    int numTestcol2 = 0;
+    int numTestcol3 = 0;
+    int numTestcol4 = 0;
+    int others = 0;
+    for (Column column : r.columns) {
+      if (column.name.equals("testcol1")) {
+        numTestcol1 += 1;
+      } else if (column.name.equals("testcol2")) {
+        numTestcol2 += 1;
+      } else if (column.name.equals("testcol3")) {
+        numTestcol3 += 1;
+      } else if (column.name.equals("testcol4")) {
+        numTestcol4 += 1;
+      } else {
+        others += 1;
+      }
+    }
+    assertEquals("should append testcol1", 2, numTestcol1);
+    assertEquals("should append testcol2", 2, numTestcol2);
+    assertEquals("should not append testcol3", 1, numTestcol3);
+    assertEquals("should append testcol4", 1, numTestcol4);
+    assertEquals("should not find other columns", 0, others);
+  }
+
+  @Test(expected = BlurException.class)
+  public void testMutationUpdateRowMissingRecordAppendColumns() throws Exception {
+    Column c1 = newColumn("testcol1", "value999");
+    Column c2 = newColumn("testcol2", "value9999");
+    Column c3 = newColumn("testcol4", "hmm");
+    String rec = "record-1B";
+    RecordMutation rm = newRecordMutation(APPEND_COLUMN_VALUES, FAMILY, rec, c1, c2, c3);
+
+    updateAndFetchRecord("row-1", rec, rm);
+  }
+
+  @Test(expected = BlurException.class)
+  public void testMutationUpdateMissingRowAppendColumns() throws Exception {
+    Column c1 = newColumn("testcol1", "value999");
+    Column c2 = newColumn("testcol2", "value9999");
+    String rec = "record-6";
+    RecordMutation rm = newRecordMutation(APPEND_COLUMN_VALUES, FAMILY, rec, c1, c2);
+
+    updateAndFetchRecord("row-6", rec, rm);
+  }
+
+  private Record updateAndFetchRecord(String rowId, String recordId, RecordMutation... recordMutations) throws Exception {
+    RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, rowId, recordMutations);
+    rowMutation.waitToBeVisible = true;
+    indexManager.mutate(rowMutation);
+
+    Selector selector = new Selector().setRowId(rowId).setRecordId(recordId);
+    selector.setRecordOnly(true);
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    return (fetchResult.recordResult != null ? fetchResult.recordResult.record : null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
new file mode 100644
index 0000000..cd55768
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
@@ -0,0 +1,255 @@
+package org.apache.blur.manager.clusterstatus;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class ZookeeperClusterStatusTest {
+
+  private static final String TEST = "test";
+  private static final String DEFAULT = "default";
+
+  private static final Log LOG = LogFactory.getLog(ZookeeperClusterStatusTest.class);
+  private ZooKeeper zooKeeper;
+  private ZookeeperClusterStatus clusterStatus;
+
+  public static class QuorumPeerMainRun extends QuorumPeerMain {
+    @Override
+    public void initializeAndRun(String[] args) throws ConfigException, IOException {
+      super.initializeAndRun(args);
+    }
+  }
+
+  @BeforeClass
+  public static void setupOnce() throws InterruptedException, IOException, KeeperException {
+    MiniCluster.startZooKeeper("./tmp/zk_test");
+  }
+
+  @AfterClass
+  public static void teardownOnce() {
+    MiniCluster.shutdownZooKeeper();
+  }
+
+  @Before
+  public void setup() throws KeeperException, InterruptedException, IOException {
+    zooKeeper = new ZooKeeper(MiniCluster.getZkConnectionString(), 30000, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+
+      }
+    });
+    BlurUtil.setupZookeeper(zooKeeper, DEFAULT);
+    clusterStatus = new ZookeeperClusterStatus(zooKeeper);
+  }
+
+  @After
+  public void teardown() throws InterruptedException {
+    clusterStatus.close();
+    zooKeeper.close();
+  }
+
+  @Test
+  public void testGetClusterList() {
+    LOG.warn("testGetClusterList");
+    List<String> clusterList = clusterStatus.getClusterList(false);
+    assertEquals(Arrays.asList(DEFAULT), clusterList);
+  }
+
+  @Test
+  public void testSafeModeNotSet() throws KeeperException, InterruptedException {
+    LOG.warn("testSafeModeNotSet");
+    assertFalse(clusterStatus.isInSafeMode(false, DEFAULT));
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus.isInSafeMode(true, DEFAULT);
+      }
+    }.test(false);
+  }
+
+  @Test
+  public void testSafeModeSetInPast() throws KeeperException, InterruptedException {
+    LOG.warn("testSafeModeSetInPast");
+    setSafeModeInPast();
+    assertFalse(clusterStatus.isInSafeMode(false, DEFAULT));
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus.isInSafeMode(true, DEFAULT);
+      }
+    }.test(false);
+  }
+
+  @Test
+  public void testSafeModeSetInFuture() throws KeeperException, InterruptedException {
+    LOG.warn("testSafeModeSetInFuture");
+    setSafeModeInFuture();
+    assertTrue(clusterStatus.isInSafeMode(false, DEFAULT));
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus.isInSafeMode(true, DEFAULT);
+      }
+    }.test(true);
+  }
+
+  @Test
+  public void testGetClusterNoTable() {
+    LOG.warn("testGetCluster");
+    assertNull(clusterStatus.getCluster(false, TEST));
+    assertNull(clusterStatus.getCluster(true, TEST));
+  }
+
+  @Test
+  public void testGetClusterTable() throws KeeperException, InterruptedException {
+    LOG.warn("testGetCluster");
+    createTable(TEST);
+    assertEquals(DEFAULT, clusterStatus.getCluster(false, TEST));
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus.getCluster(true, TEST);
+      }
+    }.test(DEFAULT);
+  }
+
+  @Test
+  public void testGetTableList() {
+    assertEquals(Arrays.asList(TEST), clusterStatus.getTableList(false));
+  }
+
+  @Test
+  public void testIsEnabledNoTable() {
+    assertFalse(clusterStatus.isEnabled(false, DEFAULT, "notable"));
+    assertFalse(clusterStatus.isEnabled(true, DEFAULT, "notable"));
+  }
+
+  @Test
+  public void testIsEnabledDisabledTable() throws KeeperException, InterruptedException {
+    createTable("disabledtable", false);
+    assertFalse(clusterStatus.isEnabled(false, DEFAULT, "disabledtable"));
+    assertFalse(clusterStatus.isEnabled(true, DEFAULT, "disabledtable"));
+  }
+
+  @Test
+  public void testIsEnabledEnabledTable() throws KeeperException, InterruptedException {
+    createTable("enabledtable", true);
+    assertTrue(clusterStatus.isEnabled(false, DEFAULT, "enabledtable"));
+    assertTrue(clusterStatus.isEnabled(true, DEFAULT, "enabledtable"));
+  }
+
+  private void createTable(String name) throws KeeperException, InterruptedException {
+    createTable(name, true);
+  }
+
+  private void createTable(String name, boolean enabled) throws KeeperException, InterruptedException {
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setName(name);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri("./tmp/zk_test_hdfs");
+    tableDescriptor.setIsEnabled(enabled);
+    clusterStatus.createTable(tableDescriptor);
+    if (enabled) {
+      clusterStatus.enableTable(tableDescriptor.getCluster(), name);
+    }
+  }
+
+  public abstract class WaitForAnswerToBeCorrect {
+
+    private long totalWaitTimeNanos;
+
+    public WaitForAnswerToBeCorrect(long totalWaitTimeMs) {
+      this.totalWaitTimeNanos = TimeUnit.MILLISECONDS.toNanos(totalWaitTimeMs);
+    }
+
+    public abstract Object run();
+
+    public void test(Object o) {
+      long start = System.nanoTime();
+      while (true) {
+        Object object = run();
+        if (object.equals(o) || object == o) {
+          return;
+        }
+        long now = System.nanoTime();
+        if (now - start > totalWaitTimeNanos) {
+          fail();
+        }
+        try {
+          Thread.sleep(1);
+        } catch (InterruptedException e) {
+          fail(e.getMessage());
+        }
+      }
+    }
+  }
+
+  private void setSafeModeInPast() throws KeeperException, InterruptedException {
+    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(DEFAULT);
+    Stat stat = zooKeeper.exists(blurSafemodePath, false);
+    byte[] data = Long.toString(System.currentTimeMillis() - 60000).getBytes();
+    if (stat == null) {
+      zooKeeper.create(blurSafemodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+    zooKeeper.setData(blurSafemodePath, data, -1);
+  }
+
+  private void setSafeModeInFuture() throws KeeperException, InterruptedException {
+    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(DEFAULT);
+    Stat stat = zooKeeper.exists(blurSafemodePath, false);
+    byte[] data = Long.toString(System.currentTimeMillis() + 60000).getBytes();
+    if (stat == null) {
+      zooKeeper.create(blurSafemodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+    zooKeeper.setData(blurSafemodePath, data, -1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/indexserver/DistributedLayoutManagerTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/indexserver/DistributedLayoutManagerTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/indexserver/DistributedLayoutManagerTest.java
new file mode 100644
index 0000000..f3136ea
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/indexserver/DistributedLayoutManagerTest.java
@@ -0,0 +1,116 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.blur.manager.indexserver.DistributedLayoutManager;
+import org.junit.Test;
+
+
+public class DistributedLayoutManagerTest {
+
+  @Test
+  public void testLayoutManager() {
+    TreeSet<String> nodes = new TreeSet<String>();
+    nodes.add("n1");
+    nodes.add("n2");
+    nodes.add("n3");
+
+    TreeSet<String> nodesOffline = new TreeSet<String>();
+    nodesOffline.add("n2");
+
+    TreeSet<String> shards = new TreeSet<String>();
+    shards.add("s1");
+    shards.add("s2");
+    shards.add("s3");
+    shards.add("s4");
+    shards.add("s5");
+
+    DistributedLayoutManager layoutManager1 = new DistributedLayoutManager();
+    layoutManager1.setNodes(nodes);
+    layoutManager1.setShards(shards);
+    layoutManager1.init();
+    Map<String, String> layout1 = layoutManager1.getLayout();
+
+    DistributedLayoutManager layoutManager2 = new DistributedLayoutManager();
+    layoutManager2.setNodes(nodes);
+    layoutManager2.setShards(shards);
+    layoutManager2.setNodesOffline(nodesOffline);
+    layoutManager2.init();
+    Map<String, String> layout2 = layoutManager2.getLayout();
+
+    assertEquals(shards, new TreeSet<String>(layout1.keySet()));
+    assertEquals(nodes, new TreeSet<String>(layout1.values()));
+
+    assertEquals(shards, new TreeSet<String>(layout2.keySet()));
+    TreeSet<String> nodesOnline = new TreeSet<String>(nodes);
+    nodesOnline.removeAll(nodesOffline);
+    assertEquals(nodesOnline, new TreeSet<String>(layout2.values()));
+
+  }
+
+  @Test
+  public void testLayoutManagerPerformance() {
+    DistributedLayoutManager perfTest = new DistributedLayoutManager();
+    perfTest.setNodes(getTestNodes());
+    perfTest.setShards(getTestShards());
+    perfTest.setNodesOffline(getTestOfflineNodes());
+    perfTest.init();
+    int testSize = 100000;
+    for (int i = 0; i < testSize; i++) {
+      perfTest.getLayout();
+    }
+    long s = System.nanoTime();
+    for (int i = 0; i < testSize; i++) {
+      perfTest.getLayout();
+    }
+    long e = System.nanoTime();
+    double ms = (e - s) / 1000000.0;
+    System.out.println("Total    " + ms);
+    System.out.println("Per Call " + ms / testSize);
+    assertTrue(ms < 100);
+  }
+
+  private static Collection<String> getTestOfflineNodes() {
+    return Arrays.asList("n13");
+  }
+
+  private static Collection<String> getTestShards() {
+    Collection<String> shards = new HashSet<String>();
+    for (int i = 0; i < 701; i++) {
+      shards.add("s" + i);
+    }
+    return shards;
+  }
+
+  private static Collection<String> getTestNodes() {
+    Collection<String> nodes = new HashSet<String>();
+    for (int i = 0; i < 32; i++) {
+      nodes.add("n" + i);
+    }
+    return nodes;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparatorTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparatorTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparatorTest.java
new file mode 100644
index 0000000..02107fd
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparatorTest.java
@@ -0,0 +1,55 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.blur.manager.results.PeekableIterator;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.utils.BlurConstants;
+import org.junit.Test;
+
+
+public class BlurResultPeekableIteratorComparatorTest {
+
+  @Test
+  public void testResultPeekableIteratorComparator() {
+    List<PeekableIterator<BlurResult>> results = new ArrayList<PeekableIterator<BlurResult>>();
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>(Arrays.asList(newResult("5", 5))).iterator()));
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>().iterator()));
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>().iterator()));
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>(Arrays.asList(newResult("2", 2))).iterator()));
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>(Arrays.asList(newResult("1", 1))).iterator()));
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>(Arrays.asList(newResult("9", 1))).iterator()));
+    results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>().iterator()));
+
+    Collections.sort(results, BlurConstants.HITS_PEEKABLE_ITERATOR_COMPARATOR);
+
+    for (PeekableIterator<BlurResult> iterator : results) {
+      System.out.println(iterator.peek());
+    }
+  }
+
+  private BlurResult newResult(String id, double score) {
+    return new BlurResult(id, score, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/results/MultipleBlurResultIterableTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/results/MultipleBlurResultIterableTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/results/MultipleBlurResultIterableTest.java
new file mode 100644
index 0000000..7207eea
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/results/MultipleBlurResultIterableTest.java
@@ -0,0 +1,55 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.manager.results.BlurResultIterableMultiple;
+import org.apache.blur.manager.results.BlurResultIterableSimple;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.junit.Test;
+
+
+public class MultipleBlurResultIterableTest {
+
+  @Test
+  public void testMultipleHitsIterable() {
+    BlurResultIterableMultiple iterable = new BlurResultIterableMultiple();
+    iterable.addBlurResultIterable(newBlurResultIterable(0, 0.1, 3, 2, 9, 10, 2));
+    iterable.addBlurResultIterable(newBlurResultIterable(7, 2, 9, 1, 34, 53, 12));
+    iterable.addBlurResultIterable(newBlurResultIterable(4, 3));
+    iterable.addBlurResultIterable(newBlurResultIterable(7, 2, 34, 132));
+    iterable.addBlurResultIterable(newBlurResultIterable());
+
+    for (BlurResult hit : iterable) {
+      System.out.println(hit);
+    }
+  }
+
+  private BlurResultIterable newBlurResultIterable(double... ds) {
+    List<BlurResult> results = new ArrayList<BlurResult>();
+    for (double d : ds) {
+      results.add(new BlurResult(UUID.randomUUID().toString() + "-" + Double.toString(d), d, null));
+    }
+    return new BlurResultIterableSimple(UUID.randomUUID().toString(), results);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/results/PeekableIteratorTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/results/PeekableIteratorTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/results/PeekableIteratorTest.java
new file mode 100644
index 0000000..f590ef0
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/results/PeekableIteratorTest.java
@@ -0,0 +1,54 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.blur.manager.results.PeekableIterator;
+import org.junit.Test;
+
+
+public class PeekableIteratorTest {
+
+  @Test
+  public void testPeekableIterator1() {
+    PeekableIterator<Integer> iterator = new PeekableIterator<Integer>(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).iterator());
+    while (iterator.hasNext()) {
+      for (int i = 0; i < 3; i++) {
+        System.out.println(iterator.peek());
+      }
+      System.out.println(iterator.next());
+    }
+  }
+
+  @Test
+  public void testPeekableIteratorEmpty() {
+    PeekableIterator<Integer> iterator = new PeekableIterator<Integer>(new ArrayList<Integer>().iterator());
+    for (int i = 0; i < 3; i++) {
+      System.out.println(iterator.peek());
+    }
+    while (iterator.hasNext()) {
+      for (int i = 0; i < 3; i++) {
+        System.out.println(iterator.peek());
+      }
+      System.out.println(iterator.next());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/stats/WeightedAvgTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/stats/WeightedAvgTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/stats/WeightedAvgTest.java
new file mode 100644
index 0000000..84164b1
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/stats/WeightedAvgTest.java
@@ -0,0 +1,40 @@
+package org.apache.blur.manager.stats;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.blur.manager.stats.WeightedAvg;
+import org.junit.Test;
+
+
+public class WeightedAvgTest {
+
+  @Test
+  public void testAvg() {
+    int maxSize = 10;
+    WeightedAvg weightedAvg = new WeightedAvg(maxSize);
+    double total = 0;
+    for (int i = 1; i <= maxSize; i++) {
+      weightedAvg.add(i);
+      total += i;
+      assertTrue(Double.compare(total / i, weightedAvg.getAvg()) == 0);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
new file mode 100644
index 0000000..9e402f0
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
@@ -0,0 +1,154 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.lucene.search.FairSimilarity;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.KeywordAnalyzer;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.store.FSDirectory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class BlurNRTIndexTest {
+
+  private static final int TEST_NUMBER_WAIT_VISIBLE = 500;
+  private static final int TEST_NUMBER = 50000;
+  private BlurNRTIndex writer;
+  private BlurIndexCloser closer;
+  private Random random = new Random();
+  private BlurIndexRefresher refresher;
+  private ExecutorService service;
+  private File base;
+
+  @Before
+  public void setup() throws IOException {
+    base = new File("./tmp/blur-index-writer-test");
+    rm(base);
+    base.mkdirs();
+    closer = new BlurIndexCloser();
+    closer.init();
+
+    Configuration configuration = new Configuration();
+
+    BlurAnalyzer analyzer = new BlurAnalyzer(new KeywordAnalyzer());
+
+    refresher = new BlurIndexRefresher();
+    refresher.init();
+
+    writer = new BlurNRTIndex();
+    writer.setDirectory(FSDirectory.open(new File(base, "index")));
+    writer.setCloser(closer);
+    writer.setAnalyzer(analyzer);
+    writer.setSimilarity(new FairSimilarity());
+    writer.setTable("testing-table");
+    writer.setShard("testing-shard");
+
+    service = Executors.newThreadPool("test", 10);
+    writer.setWalPath(new Path(new File(base, "wal").toURI()));
+
+    writer.setConfiguration(configuration);
+    writer.setTimeBetweenRefreshs(25);
+    writer.init();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    refresher.close();
+    writer.close();
+    closer.close();
+    service.shutdownNow();
+    rm(base);
+  }
+
+  private void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Test
+  public void testBlurIndexWriter() throws IOException {
+    long s = System.nanoTime();
+    int total = 0;
+    for (int i = 0; i < TEST_NUMBER_WAIT_VISIBLE; i++) {
+      writer.replaceRow(true, true, genRow());
+      IndexReader reader = writer.getIndexReader();
+      assertEquals(i + 1, reader.numDocs());
+      total++;
+    }
+    long e = System.nanoTime();
+    double seconds = (e - s) / 1000000000.0;
+    double rate = total / seconds;
+    System.out.println("Rate " + rate);
+    IndexReader reader = writer.getIndexReader();
+    assertEquals(TEST_NUMBER_WAIT_VISIBLE, reader.numDocs());
+  }
+
+  @Test
+  public void testBlurIndexWriterFaster() throws IOException, InterruptedException {
+    long s = System.nanoTime();
+    int total = 0;
+    for (int i = 0; i < TEST_NUMBER; i++) {
+      writer.replaceRow(false, true, genRow());
+      total++;
+    }
+    long e = System.nanoTime();
+    double seconds = (e - s) / 1000000000.0;
+    double rate = total / seconds;
+    System.out.println("Rate " + rate);
+    writer.refresh();
+    IndexReader reader = writer.getIndexReader();
+    assertEquals(TEST_NUMBER, reader.numDocs());
+  }
+
+  private Row genRow() {
+    Row row = new Row();
+    row.setId(Long.toString(random.nextLong()));
+    Record record = new Record();
+    record.setFamily("testing");
+    record.setRecordId(Long.toString(random.nextLong()));
+    for (int i = 0; i < 10; i++) {
+      record.addToColumns(new Column("col" + i, Long.toString(random.nextLong())));
+    }
+    row.addToRecords(record);
+    return row;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
new file mode 100644
index 0000000..c0999a8
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
@@ -0,0 +1,345 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.blur.manager.writer.DirectoryReferenceCounter;
+import org.apache.blur.manager.writer.DirectoryReferenceFileGC;
+import org.apache.lucene.analysis.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.Version;
+import org.junit.Test;
+
+
+public class DirectoryReferenceCounterTest {
+
+  @Test
+  public void testDirectoryReferenceCounterTestError() throws CorruptIndexException, IOException {
+    Directory directory = wrap(new RAMDirectory());
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_36, new KeywordAnalyzer());
+    IndexWriter writer = new IndexWriter(directory, conf);
+    int size = 100;
+    IndexReader[] readers = new IndexReader[size];
+    for (int i = 0; i < size; i++) {
+      writer.addDocument(getDoc());
+      readers[i] = IndexReader.open(writer, true);
+      writer.forceMerge(1);
+    }
+
+    try {
+      for (int i = 0; i < size; i++) {
+        checkReader(readers[i], i);
+      }
+      fail();
+    } catch (Exception e) {
+      // should error
+    }
+  }
+
+  @Test
+  public void testDirectoryReferenceCounter() throws CorruptIndexException, LockObtainFailedException, IOException, InterruptedException {
+    Directory directory = wrap(new RAMDirectory());
+    DirectoryReferenceFileGC gc = new DirectoryReferenceFileGC();
+    gc.init();
+    DirectoryReferenceCounter counter = new DirectoryReferenceCounter(directory, gc);
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_36, new KeywordAnalyzer());
+    IndexWriter writer = new IndexWriter(counter, conf);
+    int size = 100;
+    IndexReader[] readers = new IndexReader[size];
+    for (int i = 0; i < size; i++) {
+      writer.addDocument(getDoc());
+      writer.forceMerge(1);
+      readers[i] = IndexReader.open(writer, true);
+    }
+
+    for (int i = 0; i < size; i++) {
+      assertEquals(i + 1, readers[i].numDocs());
+      checkReader(readers[i], i);
+    }
+
+    String[] listAll = directory.listAll();
+
+    for (int i = 0; i < size - 1; i++) {
+      readers[i].close();
+    }
+
+    for (int i = 0; i < 1000; i++) {
+      gc.run();
+      Thread.sleep(1);
+    }
+
+    IndexReader last = readers[size - 1];
+
+    assertEquals(100, last.numDocs());
+
+    assertTrue(listAll.length > directory.listAll().length);
+
+    last.close();
+    writer.close();
+    gc.close();
+  }
+
+  private Document getDoc() {
+    Document document = new Document();
+    document.add(new Field("id", "value", Store.YES, Index.NOT_ANALYZED_NO_NORMS));
+    return document;
+  }
+
+  private void checkReader(IndexReader indexReader, int size) throws CorruptIndexException, IOException {
+    for (int i = 0; i < size; i++) {
+      Document document = indexReader.document(i);
+      String value = document.get("id");
+      assertEquals(value, "value");
+    }
+  }
+
+  // This class is use simulate what would happen with a directory that will
+  // forcefully delete files even if they are still in use. e.g. HDFSDirectory
+  public static Directory wrap(final RAMDirectory ramDirectory) {
+    return new Directory() {
+      private Directory d = ramDirectory;
+      private Collection<String> deletedFiles = new LinkedBlockingQueue<String>();
+
+      @SuppressWarnings("deprecation")
+      public void touchFile(String name) throws IOException {
+        d.touchFile(name);
+      }
+
+      public void deleteFile(String name) throws IOException {
+        deletedFiles.add(name);
+        d.deleteFile(name);
+      }
+
+      public IndexOutput createOutput(String name) throws IOException {
+        return d.createOutput(name);
+      }
+
+      @SuppressWarnings("deprecation")
+      public void sync(String name) throws IOException {
+        d.sync(name);
+      }
+
+      public void sync(Collection<String> names) throws IOException {
+        d.sync(names);
+      }
+
+      public IndexInput openInput(String name) throws IOException {
+        return wrap(d.openInput(name), deletedFiles, name);
+      }
+
+      public IndexInput openInput(String name, int bufferSize) throws IOException {
+        return wrap(d.openInput(name, bufferSize), deletedFiles, name);
+      }
+
+      public void clearLock(String name) throws IOException {
+        d.clearLock(name);
+      }
+
+      public void close() throws IOException {
+        d.close();
+      }
+
+      public void setLockFactory(LockFactory lockFactory) throws IOException {
+        d.setLockFactory(lockFactory);
+      }
+
+      public String getLockID() {
+        return d.getLockID();
+      }
+
+      public void copy(Directory to, String src, String dest) throws IOException {
+        d.copy(to, src, dest);
+      }
+
+      public boolean equals(Object arg0) {
+        return d.equals(arg0);
+      }
+
+      public boolean fileExists(String name) throws IOException {
+        return d.fileExists(name);
+      }
+
+      @SuppressWarnings("deprecation")
+      public long fileModified(String name) throws IOException {
+        return d.fileModified(name);
+      }
+
+      public long fileLength(String name) throws IOException {
+        return d.fileLength(name);
+      }
+
+      public LockFactory getLockFactory() {
+        return d.getLockFactory();
+      }
+
+      public int hashCode() {
+        return d.hashCode();
+      }
+
+      public String[] listAll() throws IOException {
+        return d.listAll();
+      }
+
+      public Lock makeLock(String name) {
+        return d.makeLock(name);
+      }
+
+      public String toString() {
+        return d.toString();
+      }
+    };
+  }
+
+  @SuppressWarnings("deprecation")
+  public static IndexInput wrap(final IndexInput input, final Collection<String> deletedFiles, final String name) {
+    return new IndexInput() {
+      private IndexInput in = input;
+
+      private void checkForDeleted() throws IOException {
+        if (deletedFiles.contains(name)) {
+          throw new IOException("File [" + name + "] does not exist");
+        }
+      }
+
+      public void skipChars(int length) throws IOException {
+        checkForDeleted();
+        in.skipChars(length);
+      }
+
+      public void setModifiedUTF8StringsMode() {
+        in.setModifiedUTF8StringsMode();
+      }
+
+      public void close() throws IOException {
+        checkForDeleted();
+        in.close();
+      }
+
+      public short readShort() throws IOException {
+        checkForDeleted();
+        return in.readShort();
+      }
+
+      public void seek(long pos) throws IOException {
+        checkForDeleted();
+        in.seek(pos);
+      }
+
+      public int readInt() throws IOException {
+        checkForDeleted();
+        return in.readInt();
+      }
+
+      public void copyBytes(IndexOutput out, long numBytes) throws IOException {
+        checkForDeleted();
+        in.copyBytes(out, numBytes);
+      }
+
+      public int readVInt() throws IOException {
+        checkForDeleted();
+        return in.readVInt();
+      }
+
+      public String toString() {
+        return in.toString();
+      }
+
+      public long readLong() throws IOException {
+        checkForDeleted();
+        return in.readLong();
+      }
+
+      public long readVLong() throws IOException {
+        checkForDeleted();
+        return in.readVLong();
+      }
+
+      public String readString() throws IOException {
+        checkForDeleted();
+        return in.readString();
+      }
+
+      public Object clone() {
+        return super.clone();
+      }
+
+      public boolean equals(Object obj) {
+        return in.equals(obj);
+      }
+
+      public long getFilePointer() {
+        return in.getFilePointer();
+      }
+
+      public int hashCode() {
+        return in.hashCode();
+      }
+
+      public byte readByte() throws IOException {
+        checkForDeleted();
+        return in.readByte();
+      }
+
+      public void readBytes(byte[] b, int offset, int len) throws IOException {
+        checkForDeleted();
+        in.readBytes(b, offset, len);
+      }
+
+      public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
+        checkForDeleted();
+        in.readBytes(b, offset, len, useBuffer);
+      }
+
+      public long length() {
+        return in.length();
+      }
+
+      public void readChars(char[] buffer, int start, int length) throws IOException {
+        checkForDeleted();
+        in.readChars(buffer, start, length);
+      }
+
+      public Map<String, String> readStringStringMap() throws IOException {
+        checkForDeleted();
+        return in.readStringStringMap();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
new file mode 100644
index 0000000..06180dc
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
@@ -0,0 +1,105 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.index.IndexWriter;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.KeywordAnalyzer;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.Version;
+import org.junit.Test;
+
+
+public class TransactionRecorderTest {
+
+  @Test
+  public void testReplay() throws IOException {
+    String tmpPath = "./tmp/transaction-recorder/wal";
+    rm(new File(tmpPath));
+
+    KeywordAnalyzer analyzer = new KeywordAnalyzer();
+    Configuration configuration = new Configuration();
+    BlurAnalyzer blurAnalyzer = new BlurAnalyzer(analyzer);
+
+    TransactionRecorder transactionRecorder = new TransactionRecorder();
+    transactionRecorder.setAnalyzer(blurAnalyzer);
+    transactionRecorder.setConfiguration(configuration);
+
+    transactionRecorder.setWalPath(new Path(tmpPath));
+    transactionRecorder.init();
+    transactionRecorder.open();
+    try {
+      transactionRecorder.replaceRow(true, genRow(), null);
+      fail("Should NPE");
+    } catch (NullPointerException e) {
+    }
+    transactionRecorder.close(); // this is done so that the rawfs will flush
+                                 // the file to disk for reading
+
+    RAMDirectory directory = new RAMDirectory();
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, analyzer);
+    IndexWriter writer = new IndexWriter(directory, conf);
+
+    TransactionRecorder replayTransactionRecorder = new TransactionRecorder();
+    replayTransactionRecorder.setAnalyzer(blurAnalyzer);
+    replayTransactionRecorder.setConfiguration(configuration);
+    replayTransactionRecorder.setWalPath(new Path(tmpPath));
+    replayTransactionRecorder.init();
+
+    replayTransactionRecorder.replay(writer);
+    IndexReader reader = IndexReader.open(directory);
+    assertEquals(1, reader.numDocs());
+  }
+
+  private void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  private Row genRow() {
+    Row row = new Row();
+    row.id = "1";
+    Record record = new Record();
+    record.recordId = "1";
+    record.family = "test";
+    record.addToColumns(new Column("name", "value"));
+    row.addToRecords(record);
+    return row;
+  }
+
+}


Mime
View raw message