http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
new file mode 100644
index 0000000..58d2d3d
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -0,0 +1,890 @@
+package org.apache.blur.manager;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.apache.blur.thrift.generated.RecordMutationType.APPEND_COLUMN_VALUES;
+import static org.apache.blur.thrift.generated.RecordMutationType.DELETE_ENTIRE_RECORD;
+import static org.apache.blur.thrift.generated.RecordMutationType.REPLACE_COLUMNS;
+import static org.apache.blur.thrift.generated.RecordMutationType.REPLACE_ENTIRE_RECORD;
+import static org.apache.blur.thrift.generated.RowMutationType.DELETE_ROW;
+import static org.apache.blur.thrift.generated.RowMutationType.UPDATE_ROW;
+import static org.apache.blur.utils.BlurUtil.match;
+import static org.apache.blur.utils.BlurUtil.newColumn;
+import static org.apache.blur.utils.BlurUtil.newRecord;
+import static org.apache.blur.utils.BlurUtil.newRecordMutation;
+import static org.apache.blur.utils.BlurUtil.newRow;
+import static org.apache.blur.utils.BlurUtil.newRowMutation;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.manager.indexserver.LocalIndexServer;
+import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.metrics.BlurMetrics;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Facet;
+import org.apache.blur.thrift.generated.FetchRecordResult;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.Schema;
+import org.apache.blur.thrift.generated.ScoreType;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.SimpleQuery;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class IndexManagerTest {
+
+ private static final String SHARD_NAME = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, 0);
+ private static final String TABLE = "table";
+ private static final String FAMILY = "test-family";
+ private static final String FAMILY2 = "test-family2";
+ private LocalIndexServer server;
+ private IndexManager indexManager;
+
+ @Before
+ public void setUp() throws BlurException, IOException, InterruptedException {
+ File file = new File("./tmp/indexer-manager-test");
+ rm(file);
+ new File(new File(file, TABLE), SHARD_NAME).mkdirs();
+ server = new LocalIndexServer(file, new Path("./tmp/indexer-manager-test"));
+
+ indexManager = new IndexManager();
+ indexManager.setStatusCleanupTimerDelay(1000);
+ indexManager.setIndexServer(server);
+ indexManager.setThreadCount(1);
+ indexManager.setBlurMetrics(new BlurMetrics(new Configuration()));
+ indexManager.init();
+ setupData();
+ }
+
+ @After
+ public void teardown() {
+ indexManager.close();
+ indexManager = null;
+ server = null;
+ }
+
+ private void rm(File file) {
+ if (file.isDirectory()) {
+ for (File f : file.listFiles()) {
+ rm(f);
+ }
+ }
+ file.delete();
+ }
+
+ private void setupData() throws BlurException, IOException {
+ RowMutation mutation1 = newRowMutation(TABLE, "row-1",
+ newRecordMutation(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3")));
+ RowMutation mutation2 = newRowMutation(TABLE, "row-2",
+ newRecordMutation(FAMILY, "record-2", newColumn("testcol1", "value4"), newColumn("testcol2", "value5"), newColumn("testcol3", "value6")),
+ newRecordMutation(FAMILY, "record-2B", newColumn("testcol2", "value234123"), newColumn("testcol3", "value234123")));
+ RowMutation mutation3 = newRowMutation(TABLE, "row-3",
+ newRecordMutation(FAMILY, "record-3", newColumn("testcol1", "value7"), newColumn("testcol2", "value8"), newColumn("testcol3", "value9")));
+ RowMutation mutation4 = newRowMutation(TABLE, "row-4",
+ newRecordMutation(FAMILY, "record-4", newColumn("testcol1", "value1"), newColumn("testcol2", "value5"), newColumn("testcol3", "value9")),
+ newRecordMutation(FAMILY, "record-4B", newColumn("testcol2", "value234123"), newColumn("testcol3", "value234123")));
+ RowMutation mutation5 = newRowMutation(
+ TABLE,
+ "row-5",
+ newRecordMutation(FAMILY, "record-5A", newColumn("testcol1", "value13"), newColumn("testcol2", "value14"), newColumn("testcol3", "value15")),
+ newRecordMutation(FAMILY, "record-5B", newColumn("testcol1", "value16"), newColumn("testcol2", "value17"), newColumn("testcol3", "value18"),
+ newColumn("testcol3", "value19")));
+ RowMutation mutation6 = newRowMutation(TABLE, "row-6", newRecordMutation(FAMILY, "record-6A", newColumn("testcol12", "value110"), newColumn("testcol13", "value102")),
+ newRecordMutation(FAMILY, "record-6B", newColumn("testcol12", "value101"), newColumn("testcol13", "value104")),
+ newRecordMutation(FAMILY2, "record-6C", newColumn("testcol18", "value501")));
+ RowMutation mutation7 = newRowMutation(TABLE, "row-7", newRecordMutation(FAMILY, "record-7A", newColumn("testcol12", "value101"), newColumn("testcol13", "value102")),
+ newRecordMutation(FAMILY2, "record-7B", newColumn("testcol18", "value501")));
+ mutation7.waitToBeVisible = true;
+ indexManager.mutate(mutation1);
+ indexManager.mutate(mutation2);
+ indexManager.mutate(mutation3);
+ indexManager.mutate(mutation4);
+ indexManager.mutate(mutation5);
+ indexManager.mutate(mutation6);
+ indexManager.mutate(mutation7);
+ }
+
+ @Test
+ public void testQueryWithJoinAll() throws Exception {
+ BlurQuery blurQuery = new BlurQuery();
+ blurQuery.simpleQuery = new SimpleQuery();
+ blurQuery.simpleQuery.queryStr = "+test-family.testcol12:value101 +test-family.testcol13:value102 +test-family2.testcol18:value501";
+ blurQuery.simpleQuery.superQueryOn = true;
+ blurQuery.simpleQuery.type = ScoreType.SUPER;
+ blurQuery.fetch = 10;
+ blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+ blurQuery.maxQueryTime = Long.MAX_VALUE;
+ blurQuery.uuid = 1;
+
+ BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+ assertEquals(iterable.getTotalResults(), 2);
+ for (BlurResult result : iterable) {
+ Selector selector = new Selector().setLocationId(result.getLocationId());
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNotNull(fetchResult.rowResult);
+ assertNull(fetchResult.recordResult);
+ }
+ }
+
+ @Test
+ public void testQueryWithJoin() throws Exception {
+ BlurQuery blurQuery = new BlurQuery();
+ blurQuery.simpleQuery = new SimpleQuery();
+ blurQuery.simpleQuery.queryStr = "+(+test-family.testcol12:value101 +test-family.testcol13:value102) +test-family2.testcol18:value501";
+ blurQuery.simpleQuery.superQueryOn = true;
+ blurQuery.simpleQuery.type = ScoreType.SUPER;
+ blurQuery.fetch = 10;
+ blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+ blurQuery.maxQueryTime = Long.MAX_VALUE;
+ blurQuery.uuid = 1;
+
+ BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+ assertEquals(iterable.getTotalResults(), 1);
+ for (BlurResult result : iterable) {
+ Selector selector = new Selector().setLocationId(result.getLocationId());
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNotNull(fetchResult.rowResult);
+ assertNull(fetchResult.recordResult);
+ }
+ }
+
+ @Test
+ public void testQueryWithJoinForcingSuperQuery() throws Exception {
+ BlurQuery blurQuery = new BlurQuery();
+ blurQuery.simpleQuery = new SimpleQuery();
+ blurQuery.simpleQuery.queryStr = "+(+test-family.testcol1:value1 nojoin) +(+test-family.testcol3:value234123)";
+ blurQuery.simpleQuery.superQueryOn = true;
+ blurQuery.simpleQuery.type = ScoreType.SUPER;
+ blurQuery.fetch = 10;
+ blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+ blurQuery.maxQueryTime = Long.MAX_VALUE;
+ blurQuery.uuid = 1;
+
+ BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+ assertEquals(iterable.getTotalResults(), 1);
+ for (BlurResult result : iterable) {
+ Selector selector = new Selector().setLocationId(result.getLocationId());
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNotNull(fetchResult.rowResult);
+ assertNull(fetchResult.recordResult);
+ }
+ }
+
+ @Test
+ public void testQueryWithFacetsWithWildCard() throws Exception {
+ BlurQuery blurQuery = new BlurQuery();
+ blurQuery.simpleQuery = new SimpleQuery();
+ blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+ blurQuery.simpleQuery.superQueryOn = true;
+ blurQuery.simpleQuery.type = ScoreType.SUPER;
+ blurQuery.fetch = 10;
+ blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+ blurQuery.maxQueryTime = Long.MAX_VALUE;
+ blurQuery.uuid = 1;
+ blurQuery.facets = Arrays.asList(new Facet("test-family.testcol1:value*", Long.MAX_VALUE), new Facet("test-family.testcol1:value-nohit", Long.MAX_VALUE));
+
+ AtomicLongArray facetedCounts = new AtomicLongArray(2);
+ BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, facetedCounts);
+ assertEquals(iterable.getTotalResults(), 2);
+ for (BlurResult result : iterable) {
+ Selector selector = new Selector().setLocationId(result.getLocationId());
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNotNull(fetchResult.rowResult);
+ assertNull(fetchResult.recordResult);
+ }
+
+ assertEquals(2, facetedCounts.get(0));
+ assertEquals(0, facetedCounts.get(1));
+
+ assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+ Thread.sleep(2000);// wait for cleanup to fire
+ assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+ }
+
+ @Test
+ public void testFetchRowByLocationId() throws Exception {
+ Selector selector = new Selector().setLocationId(SHARD_NAME + "/0");
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNotNull(fetchResult.rowResult.row);
+ Row row = newRow("row-1", newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3")));
+ row.recordCount = 1;
+ assertEquals(row, fetchResult.rowResult.row);
+ }
+
+ @Test
+ public void testFetchMissingRowByLocationId() throws Exception {
+ try {
+ Selector selector = new Selector().setLocationId("shard4/0");
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ fail("Should throw exception");
+ } catch (BlurException e) {
+ }
+ }
+
+ @Test
+ public void testFetchRecordByLocationId() throws Exception {
+ Selector selector = new Selector().setLocationId(SHARD_NAME + "/0").setRecordOnly(true);
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNull(fetchResult.rowResult);
+ assertNotNull(fetchResult.recordResult.record);
+
+ assertEquals("row-1", fetchResult.recordResult.rowid);
+ assertEquals("record-1", fetchResult.recordResult.record.recordId);
+ assertEquals(FAMILY, fetchResult.recordResult.record.family);
+
+ Record record = newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3"));
+ assertEquals(record, fetchResult.recordResult.record);
+ }
+
+ @Test
+ public void testFetchRowByRowId() throws Exception {
+ Selector selector = new Selector().setRowId("row-1");
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNotNull(fetchResult.rowResult.row);
+ Row row = newRow("row-1", newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3")));
+ row.recordCount = 1;
+ assertEquals(row, fetchResult.rowResult.row);
+ }
+
+ @Test
+ public void testFetchRowByRecordIdOnly() throws Exception {
+ Selector selector = new Selector().setRecordId("record-1");
+ FetchResult fetchResult = new FetchResult();
+ try {
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ fail("Invalid selector should throw exception.");
+ } catch (BlurException e) {
+ // do nothing, this is a pass
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testFetchRowByRecordIdOnlyNoRecordOnly() throws Exception {
+ Selector selector = new Selector().setRowId("row-1").setRecordId("record-1");
+ FetchResult fetchResult = new FetchResult();
+ try {
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ fail("Invalid selector should throw exception.");
+ } catch (BlurException e) {
+ // do nothing, this is a pass
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testFetchRowByRecordId() throws Exception {
+ Selector selector = new Selector().setRowId("row-1").setRecordId("record-1").setRecordOnly(true);
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertFalse(fetchResult.deleted);
+ assertTrue(fetchResult.exists);
+ assertEquals(TABLE, fetchResult.table);
+ assertNull(fetchResult.rowResult);
+ assertNotNull(fetchResult.recordResult);
+ FetchRecordResult recordResult = fetchResult.recordResult;
+ assertEquals(FAMILY, recordResult.record.family);
+ assertEquals("record-1", recordResult.record.recordId);
+ assertEquals("row-1", recordResult.rowid);
+
+ Record record = newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2", "value2"), newColumn("testcol3", "value3"));
+ assertEquals(record, recordResult.record);
+
+ }
+
+ @Test
+ public void testRecordFrequency() throws Exception {
+ assertEquals(2, indexManager.recordFrequency(TABLE, FAMILY, "testcol1", "value1"));
+ assertEquals(0, indexManager.recordFrequency(TABLE, FAMILY, "testcol1", "NO VALUE"));
+ }
+
+ @Test
+ public void testSchema() throws Exception {
+ Schema schema = indexManager.schema(TABLE);
+ assertEquals(TABLE, schema.table);
+ Map<String, Set<String>> columnFamilies = schema.columnFamilies;
+ assertEquals(new TreeSet<String>(Arrays.asList(FAMILY, FAMILY2)), new TreeSet<String>(columnFamilies.keySet()));
+ assertEquals(new TreeSet<String>(Arrays.asList("testcol1", "testcol2", "testcol3", "testcol12", "testcol13")), new TreeSet<String>(columnFamilies.get(FAMILY)));
+ assertEquals(new TreeSet<String>(Arrays.asList("testcol18")), new TreeSet<String>(columnFamilies.get(FAMILY2)));
+ }
+
+ @Test
+ public void testQuerySuperQueryTrue() throws Exception {
+ BlurQuery blurQuery = new BlurQuery();
+ blurQuery.simpleQuery = new SimpleQuery();
+ blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+ blurQuery.simpleQuery.superQueryOn = true;
+ blurQuery.simpleQuery.type = ScoreType.SUPER;
+ blurQuery.fetch = 10;
+ blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+ blurQuery.maxQueryTime = Long.MAX_VALUE;
+ blurQuery.uuid = 1;
+
+ BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+ assertEquals(2, iterable.getTotalResults());
+ for (BlurResult result : iterable) {
+ Selector selector = new Selector().setLocationId(result.getLocationId());
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNotNull(fetchResult.rowResult);
+ assertNull(fetchResult.recordResult);
+ }
+
+ assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+ Thread.sleep(2000);// wait for cleanup to fire
+ assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+ }
+
+ @Test
+ public void testQuerySuperQueryTrueWithSelector() throws Exception {
+ BlurQuery blurQuery = new BlurQuery();
+ blurQuery.simpleQuery = new SimpleQuery();
+ blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+ blurQuery.simpleQuery.superQueryOn = true;
+ blurQuery.simpleQuery.type = ScoreType.SUPER;
+ blurQuery.fetch = 10;
+ blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+ blurQuery.maxQueryTime = Long.MAX_VALUE;
+ blurQuery.uuid = 1;
+ blurQuery.selector = new Selector();
+
+ BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+ assertEquals(iterable.getTotalResults(), 2);
+ for (BlurResult result : iterable) {
+ assertNotNull(result.fetchResult.rowResult);
+ assertNull(result.fetchResult.recordResult);
+ }
+
+ assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+ Thread.sleep(2000);// wait for cleanup to fire
+ assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+ }
+
+ @Test
+ public void testQuerySuperQueryFalse() throws Exception {
+ BlurQuery blurQuery = new BlurQuery();
+ blurQuery.simpleQuery = new SimpleQuery();
+ blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+ blurQuery.simpleQuery.superQueryOn = false;
+ blurQuery.fetch = 10;
+ blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+ blurQuery.maxQueryTime = Long.MAX_VALUE;
+ blurQuery.uuid = 1;
+
+ BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+ assertEquals(iterable.getTotalResults(), 2);
+ for (BlurResult result : iterable) {
+ Selector selector = new Selector().setLocationId(result.getLocationId()).setRecordOnly(true);
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNull(fetchResult.rowResult);
+ assertNotNull(fetchResult.recordResult);
+ }
+
+ assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+ Thread.sleep(2000);// wait for cleanup to fire
+ assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+ }
+
+ @Test
+ public void testQuerySuperQueryFalseWithSelector() throws Exception {
+ BlurQuery blurQuery = new BlurQuery();
+ blurQuery.simpleQuery = new SimpleQuery();
+ blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+ blurQuery.simpleQuery.superQueryOn = false;
+ blurQuery.fetch = 10;
+ blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+ blurQuery.maxQueryTime = Long.MAX_VALUE;
+ blurQuery.uuid = 1;
+ blurQuery.selector = new Selector();
+ blurQuery.selector.setRecordOnly(true);
+
+ BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+ assertEquals(iterable.getTotalResults(), 2);
+ for (BlurResult result : iterable) {
+ assertNull(result.fetchResult.rowResult);
+ assertNotNull(result.fetchResult.recordResult);
+ }
+
+ assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+ Thread.sleep(2000);// wait for cleanup to fire
+ assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+ }
+
+ @Test
+ public void testQueryRecordOnly() throws Exception {
+ BlurQuery blurQuery = new BlurQuery();
+ blurQuery.simpleQuery = new SimpleQuery();
+ blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+ blurQuery.selector = new Selector();
+ blurQuery.selector.setRecordOnly(true);
+
+ BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, null);
+ assertEquals(iterable.getTotalResults(), 2);
+
+ int matchRecord1 = 0;
+ int matchRecord4 = 0;
+
+ for (BlurResult result : iterable) {
+ assertNull(result.fetchResult.rowResult);
+ assertNotNull(result.fetchResult.recordResult);
+
+ Record r = result.fetchResult.recordResult.record;
+
+ if (r.getRecordId().equals("record-1")) {
+ matchRecord1 += 1;
+ } else if (r.getRecordId().equals("record-4")) {
+ matchRecord4 += 1;
+ } else {
+ fail("Unexpected record ID [" + r.getRecordId() + "]");
+ }
+ }
+
+ assertEquals("Unexpected number of record-1 results", 1, matchRecord1);
+ assertEquals("Unexpected number of record-4 results", 1, matchRecord4);
+ }
+
+ @Test
+ public void testQueryWithFacets() throws Exception {
+ BlurQuery blurQuery = new BlurQuery();
+ blurQuery.simpleQuery = new SimpleQuery();
+ blurQuery.simpleQuery.queryStr = "test-family.testcol1:value1";
+ blurQuery.simpleQuery.superQueryOn = true;
+ blurQuery.simpleQuery.type = ScoreType.SUPER;
+ blurQuery.fetch = 10;
+ blurQuery.minimumNumberOfResults = Long.MAX_VALUE;
+ blurQuery.maxQueryTime = Long.MAX_VALUE;
+ blurQuery.uuid = 1;
+ blurQuery.facets = Arrays.asList(new Facet("test-family.testcol1:value1", Long.MAX_VALUE), new Facet("test-family.testcol1:value-nohit", Long.MAX_VALUE));
+
+ AtomicLongArray facetedCounts = new AtomicLongArray(2);
+ BlurResultIterable iterable = indexManager.query(TABLE, blurQuery, facetedCounts);
+ assertEquals(iterable.getTotalResults(), 2);
+ for (BlurResult result : iterable) {
+ Selector selector = new Selector().setLocationId(result.getLocationId());
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNotNull(fetchResult.rowResult);
+ assertNull(fetchResult.recordResult);
+ }
+
+ assertEquals(2, facetedCounts.get(0));
+ assertEquals(0, facetedCounts.get(1));
+
+ assertFalse(indexManager.currentQueries(TABLE).isEmpty());
+ Thread.sleep(2000);// wait for cleanup to fire
+ assertTrue(indexManager.currentQueries(TABLE).isEmpty());
+ }
+
+ @Test
+ public void testTerms() throws Exception {
+ List<String> terms = indexManager.terms(TABLE, FAMILY, "testcol1", "", (short) 100);
+ assertEquals(Arrays.asList("value1", "value13", "value16", "value4", "value7"), terms);
+ }
+
+ @Test
+ public void testMutationReplaceRow() throws Exception {
+ RowMutation mutation = newRowMutation(TABLE, "row-4",
+ newRecordMutation(FAMILY, "record-4", newColumn("testcol1", "value2"), newColumn("testcol2", "value3"), newColumn("testcol3", "value4")));
+ mutation.waitToBeVisible = true;
+ indexManager.mutate(mutation);
+
+ Selector selector = new Selector().setRowId("row-4");
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNotNull(fetchResult.rowResult.row);
+ Row row = newRow("row-4", newRecord(FAMILY, "record-4", newColumn("testcol1", "value2"), newColumn("testcol2", "value3"), newColumn("testcol3", "value4")));
+ row.recordCount = 1;
+ assertEquals(row, fetchResult.rowResult.row);
+ }
+
+ @Test
+ public void testMutationReplaceMissingRow() throws Exception {
+ Column c1 = newColumn("testcol1", "value20");
+ Column c2 = newColumn("testcol2", "value21");
+ Column c3 = newColumn("testcol3", "value22");
+ String rec = "record-6";
+ RecordMutation rm = newRecordMutation(FAMILY, rec, c1, c2, c3);
+ RowMutation mutation = newRowMutation(TABLE, "row-6", rm);
+ mutation.waitToBeVisible = true;
+ indexManager.mutate(mutation);
+
+ Selector selector = new Selector().setRowId("row-6");
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ Row r = fetchResult.rowResult.row;
+ assertNotNull("new row should exist", r);
+ Row row = newRow("row-6", newRecord(FAMILY, "record-6", newColumn("testcol1", "value20"), newColumn("testcol2", "value21"), newColumn("testcol3", "value22")));
+ row.recordCount = 1;
+ assertEquals("row should match", row, r);
+ }
+
+ @Test
+ public void testMutationDeleteRow() throws Exception {
+ RowMutation mutation = newRowMutation(DELETE_ROW, TABLE, "row-2");
+ mutation.waitToBeVisible = true;
+ indexManager.mutate(mutation);
+
+ Selector selector = new Selector().setRowId("row-2");
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNull("row should be deleted", fetchResult.rowResult);
+ }
+
+ @Test
+ public void testMutationDeleteMissingRow() throws Exception {
+ RowMutation mutation = newRowMutation(DELETE_ROW, TABLE, "row-6");
+ mutation.waitToBeVisible = true;
+ indexManager.mutate(mutation);
+
+ Selector selector = new Selector().setRowId("row-6");
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNull("row should not exist", fetchResult.rowResult);
+ }
+
+ @Test
+ public void testMutationUpdateRowDeleteLastRecord() throws Exception {
+ RecordMutation rm = newRecordMutation(DELETE_ENTIRE_RECORD, FAMILY, "record-3");
+
+ RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, "row-3", rm);
+
+ rowMutation.waitToBeVisible = true;
+ indexManager.mutate(rowMutation);
+
+ Selector selector = new Selector().setRowId("row-3");
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNull("row should not exist", fetchResult.rowResult);
+ }
+
+ @Test
+ public void testMutationUpdateRowDeleteRecord() throws Exception {
+ RecordMutation rm = newRecordMutation(DELETE_ENTIRE_RECORD, FAMILY, "record-5A");
+
+ RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, "row-5", rm);
+ rowMutation.waitToBeVisible = true;
+ indexManager.mutate(rowMutation);
+
+ Selector selector = new Selector().setRowId("row-5");
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ assertNotNull("row should exist", fetchResult.rowResult);
+ assertNotNull("row should exist", fetchResult.rowResult.row);
+ assertEquals("row should have one record", 1, fetchResult.rowResult.row.getRecordsSize());
+ }
+
+ @Test(expected = BlurException.class)
+ public void testMutationUpdateMissingRowDeleteRecord() throws Exception {
+ RecordMutation rm = newRecordMutation(DELETE_ENTIRE_RECORD, FAMILY, "record-101");
+
+ RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, "row-101", rm);
+ rowMutation.waitToBeVisible = true;
+ indexManager.mutate(rowMutation);
+ }
+
+ @Test
+ public void testMutationUpdateRowReplaceExistingRecord() throws Exception {
+ Column c1 = newColumn("testcol4", "value104");
+ Column c2 = newColumn("testcol5", "value105");
+ Column c3 = newColumn("testcol6", "value105");
+ String rec = "record-5A";
+ RecordMutation rm = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, rec, c1, c2, c3);
+
+ Record r = updateAndFetchRecord("row-5", rec, rm);
+
+ assertNotNull("record should exist", r);
+ assertEquals("only 3 columns in record", 3, r.getColumnsSize());
+ assertTrue("column 1 should be in record", r.columns.contains(c1));
+ assertTrue("column 2 should be in record", r.columns.contains(c2));
+ assertTrue("column 3 should be in record", r.columns.contains(c3));
+ }
+
+ @Test
+ public void testMutationUpdateRowReplaceMissingRecord() throws Exception {
+ Column c1 = newColumn("testcol4", "value104");
+ Column c2 = newColumn("testcol5", "value105");
+ Column c3 = newColumn("testcol6", "value105");
+ String rec = "record-5C";
+ RecordMutation rm = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, rec, c1, c2, c3);
+
+ Record r = updateAndFetchRecord("row-5", rec, rm);
+
+ assertNotNull("record should exist", r);
+ assertEquals("only 3 columns in record", 3, r.getColumnsSize());
+ assertTrue("column 1 should be in record", r.columns.contains(c1));
+ assertTrue("column 2 should be in record", r.columns.contains(c2));
+ assertTrue("column 3 should be in record", r.columns.contains(c3));
+ }
+
+ @Test
+ public void testMutationUpdateRowReplaceMixedRecords() throws Exception {
+ Column c1 = newColumn("testcol4", "value104");
+ Column c2 = newColumn("testcol5", "value105");
+ Column c3 = newColumn("testcol6", "value105");
+ RecordMutation rm1 = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, "record-5A", c1, c2, c3);
+ Column c4 = newColumn("testcol4", "value104");
+ Column c5 = newColumn("testcol5", "value105");
+ Column c6 = newColumn("testcol6", "value105");
+ RecordMutation rm2 = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, "record-5C", c4, c5, c6);
+
+ RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, "row-5", rm1, rm2);
+ rowMutation.waitToBeVisible = true;
+ indexManager.mutate(rowMutation);
+
+ Selector selector = new Selector().setRowId("row-5");
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ Row r = fetchResult.rowResult.row;
+ assertNotNull("row should exist", r);
+ assertEquals("only 3 records in row", 3, r.getRecordsSize());
+ int rm1Matches = 0;
+ int rm2Matches = 0;
+ int nonMatches = 0;
+ for (Record record : r.records) {
+ if (match(rm1, record)) {
+ rm1Matches += 1;
+ } else if (match(rm2, record)) {
+ rm2Matches += 1;
+ } else {
+ nonMatches += 1;
+ }
+ }
+ assertEquals("matching record should be updated", 1, rm1Matches);
+ assertEquals("missing record should be added", 1, rm2Matches);
+ assertEquals("unmodified record should exist", 1, nonMatches);
+ }
+
+ @Test(expected = BlurException.class)
+ public void testMutationUpdateMissingRowReplaceRecord() throws Exception {
+ Column c1 = newColumn("testcol1", "value104");
+ Column c2 = newColumn("testcol2", "value105");
+ Column c3 = newColumn("testcol3", "value105");
+ String rec = "record-100";
+ RecordMutation rm = newRecordMutation(REPLACE_ENTIRE_RECORD, FAMILY, rec, c1, c2, c3);
+
+ updateAndFetchRecord("row-100", rec, rm);
+ }
+
+ @Test
+ public void testMutationUpdateRowReplaceExistingColumns() throws Exception {
+ Column c1 = newColumn("testcol1", "value999");
+ Column c2 = newColumn("testcol2", "value9999");
+ String rec = "record-1";
+ RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+ Record r = updateAndFetchRecord("row-1", rec, rm);
+
+ assertNotNull("record should exist", r);
+ assertEquals("only 3 columns in record", 3, r.getColumnsSize());
+ assertTrue("column 1 should be in record", r.columns.contains(c1));
+ assertTrue("column 2 should be in record", r.columns.contains(c2));
+ boolean foundUnmodifiedColumn = false;
+ for (Column column : r.columns) {
+ if (column.name.equals("testcol3") && column.value.equals("value3")) {
+ foundUnmodifiedColumn = true;
+ break;
+ }
+ }
+ assertTrue("column 3 should be unmodified", foundUnmodifiedColumn);
+ }
+
+ @Test
+ public void testMutationUpdateRowReplaceExistingDuplicateColumns() throws Exception {
+ Column c = newColumn("testcol3", "value999");
+ String rec = "record-5B";
+ RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c);
+
+ Record r = updateAndFetchRecord("row-5", rec, rm);
+
+ assertNotNull("record should exist", r);
+ assertEquals("only 3 columns in record", 3, r.getColumnsSize());
+ assertTrue("new column should be in record", r.columns.contains(c));
+ boolean foundDuplicateColumn = false;
+ for (Column column : r.columns) {
+ if (column.name.equals(c.name) && !column.value.equals(c.value)) {
+ foundDuplicateColumn = true;
+ break;
+ }
+ }
+ assertFalse("duplicate columns should be removed", foundDuplicateColumn);
+ }
+
+ @Test
+ public void testMutationUpdateRowReplaceMissingColumns() throws Exception {
+ Column c1 = newColumn("testcol4", "value999");
+ Column c2 = newColumn("testcol5", "value9999");
+ String rec = "record-1";
+ RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+ Record r = updateAndFetchRecord("row-1", rec, rm);
+
+ assertNotNull("record should exist", r);
+ assertEquals("only 5 columns in record", 5, r.getColumnsSize());
+ assertTrue("column 1 should be in record", r.columns.contains(c1));
+ assertTrue("column 2 should be in record", r.columns.contains(c2));
+ }
+
+ @Test
+ public void testMutationUpdateRowReplaceMixedColumns() throws Exception {
+ Column c1 = newColumn("testcol1", "value999");
+ Column c2 = newColumn("testcol4", "value9999");
+ String rec = "record-1";
+ RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+ Record r = updateAndFetchRecord("row-1", rec, rm);
+
+ assertNotNull("record should exist", r);
+ assertEquals("only 4 columns in record", 4, r.getColumnsSize());
+ assertTrue("column 1 should be in record", r.columns.contains(c1));
+ assertTrue("column 2 should be in record", r.columns.contains(c2));
+ }
+
+ @Test(expected = BlurException.class)
+ public void testMutationUpdateRowMissingRecordReplaceColumns() throws Exception {
+ Column c1 = newColumn("testcol4", "value999");
+ Column c2 = newColumn("testcol5", "value9999");
+ String rec = "record-1B";
+ RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+ updateAndFetchRecord("row-1", rec, rm);
+ }
+
+ @Test(expected = BlurException.class)
+ public void testMutationUpdateMissingRowReplaceColumns() throws Exception {
+ Column c1 = newColumn("testcol1", "value999");
+ Column c2 = newColumn("testcol2", "value9999");
+ String rec = "record-6";
+ RecordMutation rm = newRecordMutation(REPLACE_COLUMNS, FAMILY, rec, c1, c2);
+
+ updateAndFetchRecord("row-6", rec, rm);
+ }
+
+ @Test
+ public void testMutationUpdateRowAppendColumns() throws Exception {
+ Column c1 = newColumn("testcol1", "value999");
+ Column c2 = newColumn("testcol2", "value9999");
+ Column c3 = newColumn("testcol4", "hmm");
+ String rec = "record-1";
+ RecordMutation rm = newRecordMutation(APPEND_COLUMN_VALUES, FAMILY, rec, c1, c2, c3);
+
+ Record r = updateAndFetchRecord("row-1", rec, rm);
+
+ assertNotNull("record should exist", r);
+ assertEquals("only 6 columns in record", 6, r.getColumnsSize());
+ assertTrue("column 1 should be in record", r.columns.contains(c1));
+ assertTrue("column 2 should be in record", r.columns.contains(c2));
+ assertTrue("column 3 should be in record", r.columns.contains(c3));
+ int numTestcol1 = 0;
+ int numTestcol2 = 0;
+ int numTestcol3 = 0;
+ int numTestcol4 = 0;
+ int others = 0;
+ for (Column column : r.columns) {
+ if (column.name.equals("testcol1")) {
+ numTestcol1 += 1;
+ } else if (column.name.equals("testcol2")) {
+ numTestcol2 += 1;
+ } else if (column.name.equals("testcol3")) {
+ numTestcol3 += 1;
+ } else if (column.name.equals("testcol4")) {
+ numTestcol4 += 1;
+ } else {
+ others += 1;
+ }
+ }
+ assertEquals("should append testcol1", 2, numTestcol1);
+ assertEquals("should append testcol2", 2, numTestcol2);
+ assertEquals("should not append testcol3", 1, numTestcol3);
+ assertEquals("should append testcol4", 1, numTestcol4);
+ assertEquals("should not find other columns", 0, others);
+ }
+
+ @Test(expected = BlurException.class)
+ public void testMutationUpdateRowMissingRecordAppendColumns() throws Exception {
+ Column c1 = newColumn("testcol1", "value999");
+ Column c2 = newColumn("testcol2", "value9999");
+ Column c3 = newColumn("testcol4", "hmm");
+ String rec = "record-1B";
+ RecordMutation rm = newRecordMutation(APPEND_COLUMN_VALUES, FAMILY, rec, c1, c2, c3);
+
+ updateAndFetchRecord("row-1", rec, rm);
+ }
+
+ @Test(expected = BlurException.class)
+ public void testMutationUpdateMissingRowAppendColumns() throws Exception {
+ Column c1 = newColumn("testcol1", "value999");
+ Column c2 = newColumn("testcol2", "value9999");
+ String rec = "record-6";
+ RecordMutation rm = newRecordMutation(APPEND_COLUMN_VALUES, FAMILY, rec, c1, c2);
+
+ updateAndFetchRecord("row-6", rec, rm);
+ }
+
+ private Record updateAndFetchRecord(String rowId, String recordId, RecordMutation... recordMutations) throws Exception {
+ RowMutation rowMutation = newRowMutation(UPDATE_ROW, TABLE, rowId, recordMutations);
+ rowMutation.waitToBeVisible = true;
+ indexManager.mutate(rowMutation);
+
+ Selector selector = new Selector().setRowId(rowId).setRecordId(recordId);
+ selector.setRecordOnly(true);
+ FetchResult fetchResult = new FetchResult();
+ indexManager.fetchRow(TABLE, selector, fetchResult);
+ return (fetchResult.recordResult != null ? fetchResult.recordResult.record : null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
new file mode 100644
index 0000000..cd55768
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
@@ -0,0 +1,255 @@
+package org.apache.blur.manager.clusterstatus;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class ZookeeperClusterStatusTest {
+
+ private static final String TEST = "test";
+ private static final String DEFAULT = "default";
+
+ private static final Log LOG = LogFactory.getLog(ZookeeperClusterStatusTest.class);
+ private ZooKeeper zooKeeper;
+ private ZookeeperClusterStatus clusterStatus;
+
+ public static class QuorumPeerMainRun extends QuorumPeerMain {
+ @Override
+ public void initializeAndRun(String[] args) throws ConfigException, IOException {
+ super.initializeAndRun(args);
+ }
+ }
+
+ @BeforeClass
+ public static void setupOnce() throws InterruptedException, IOException, KeeperException {
+ MiniCluster.startZooKeeper("./tmp/zk_test");
+ }
+
+ @AfterClass
+ public static void teardownOnce() {
+ MiniCluster.shutdownZooKeeper();
+ }
+
+ @Before
+ public void setup() throws KeeperException, InterruptedException, IOException {
+ zooKeeper = new ZooKeeper(MiniCluster.getZkConnectionString(), 30000, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+
+ }
+ });
+ BlurUtil.setupZookeeper(zooKeeper, DEFAULT);
+ clusterStatus = new ZookeeperClusterStatus(zooKeeper);
+ }
+
+ @After
+ public void teardown() throws InterruptedException {
+ clusterStatus.close();
+ zooKeeper.close();
+ }
+
+ @Test
+ public void testGetClusterList() {
+ LOG.warn("testGetClusterList");
+ List<String> clusterList = clusterStatus.getClusterList(false);
+ assertEquals(Arrays.asList(DEFAULT), clusterList);
+ }
+
+ @Test
+ public void testSafeModeNotSet() throws KeeperException, InterruptedException {
+ LOG.warn("testSafeModeNotSet");
+ assertFalse(clusterStatus.isInSafeMode(false, DEFAULT));
+ new WaitForAnswerToBeCorrect(20L) {
+ @Override
+ public Object run() {
+ return clusterStatus.isInSafeMode(true, DEFAULT);
+ }
+ }.test(false);
+ }
+
+ @Test
+ public void testSafeModeSetInPast() throws KeeperException, InterruptedException {
+ LOG.warn("testSafeModeSetInPast");
+ setSafeModeInPast();
+ assertFalse(clusterStatus.isInSafeMode(false, DEFAULT));
+ new WaitForAnswerToBeCorrect(20L) {
+ @Override
+ public Object run() {
+ return clusterStatus.isInSafeMode(true, DEFAULT);
+ }
+ }.test(false);
+ }
+
+ @Test
+ public void testSafeModeSetInFuture() throws KeeperException, InterruptedException {
+ LOG.warn("testSafeModeSetInFuture");
+ setSafeModeInFuture();
+ assertTrue(clusterStatus.isInSafeMode(false, DEFAULT));
+ new WaitForAnswerToBeCorrect(20L) {
+ @Override
+ public Object run() {
+ return clusterStatus.isInSafeMode(true, DEFAULT);
+ }
+ }.test(true);
+ }
+
+ @Test
+ public void testGetClusterNoTable() {
+ LOG.warn("testGetCluster");
+ assertNull(clusterStatus.getCluster(false, TEST));
+ assertNull(clusterStatus.getCluster(true, TEST));
+ }
+
+ @Test
+ public void testGetClusterTable() throws KeeperException, InterruptedException {
+ LOG.warn("testGetCluster");
+ createTable(TEST);
+ assertEquals(DEFAULT, clusterStatus.getCluster(false, TEST));
+ new WaitForAnswerToBeCorrect(20L) {
+ @Override
+ public Object run() {
+ return clusterStatus.getCluster(true, TEST);
+ }
+ }.test(DEFAULT);
+ }
+
+ @Test
+ public void testGetTableList() {
+ assertEquals(Arrays.asList(TEST), clusterStatus.getTableList(false));
+ }
+
+ @Test
+ public void testIsEnabledNoTable() {
+ assertFalse(clusterStatus.isEnabled(false, DEFAULT, "notable"));
+ assertFalse(clusterStatus.isEnabled(true, DEFAULT, "notable"));
+ }
+
+ @Test
+ public void testIsEnabledDisabledTable() throws KeeperException, InterruptedException {
+ createTable("disabledtable", false);
+ assertFalse(clusterStatus.isEnabled(false, DEFAULT, "disabledtable"));
+ assertFalse(clusterStatus.isEnabled(true, DEFAULT, "disabledtable"));
+ }
+
+ @Test
+ public void testIsEnabledEnabledTable() throws KeeperException, InterruptedException {
+ createTable("enabledtable", true);
+ assertTrue(clusterStatus.isEnabled(false, DEFAULT, "enabledtable"));
+ assertTrue(clusterStatus.isEnabled(true, DEFAULT, "enabledtable"));
+ }
+
+ private void createTable(String name) throws KeeperException, InterruptedException {
+ createTable(name, true);
+ }
+
+ private void createTable(String name, boolean enabled) throws KeeperException, InterruptedException {
+ TableDescriptor tableDescriptor = new TableDescriptor();
+ tableDescriptor.setName(name);
+ tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+ tableDescriptor.setTableUri("./tmp/zk_test_hdfs");
+ tableDescriptor.setIsEnabled(enabled);
+ clusterStatus.createTable(tableDescriptor);
+ if (enabled) {
+ clusterStatus.enableTable(tableDescriptor.getCluster(), name);
+ }
+ }
+
+ public abstract class WaitForAnswerToBeCorrect {
+
+ private long totalWaitTimeNanos;
+
+ public WaitForAnswerToBeCorrect(long totalWaitTimeMs) {
+ this.totalWaitTimeNanos = TimeUnit.MILLISECONDS.toNanos(totalWaitTimeMs);
+ }
+
+ public abstract Object run();
+
+ public void test(Object o) {
+ long start = System.nanoTime();
+ while (true) {
+ Object object = run();
+ if (object.equals(o) || object == o) {
+ return;
+ }
+ long now = System.nanoTime();
+ if (now - start > totalWaitTimeNanos) {
+ fail();
+ }
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ fail(e.getMessage());
+ }
+ }
+ }
+ }
+
+ private void setSafeModeInPast() throws KeeperException, InterruptedException {
+ String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(DEFAULT);
+ Stat stat = zooKeeper.exists(blurSafemodePath, false);
+ byte[] data = Long.toString(System.currentTimeMillis() - 60000).getBytes();
+ if (stat == null) {
+ zooKeeper.create(blurSafemodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ zooKeeper.setData(blurSafemodePath, data, -1);
+ }
+
+ private void setSafeModeInFuture() throws KeeperException, InterruptedException {
+ String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(DEFAULT);
+ Stat stat = zooKeeper.exists(blurSafemodePath, false);
+ byte[] data = Long.toString(System.currentTimeMillis() + 60000).getBytes();
+ if (stat == null) {
+ zooKeeper.create(blurSafemodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ zooKeeper.setData(blurSafemodePath, data, -1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/indexserver/DistributedLayoutManagerTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/indexserver/DistributedLayoutManagerTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/indexserver/DistributedLayoutManagerTest.java
new file mode 100644
index 0000000..f3136ea
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/indexserver/DistributedLayoutManagerTest.java
@@ -0,0 +1,116 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.blur.manager.indexserver.DistributedLayoutManager;
+import org.junit.Test;
+
+
+public class DistributedLayoutManagerTest {
+
+ @Test
+ public void testLayoutManager() {
+ TreeSet<String> nodes = new TreeSet<String>();
+ nodes.add("n1");
+ nodes.add("n2");
+ nodes.add("n3");
+
+ TreeSet<String> nodesOffline = new TreeSet<String>();
+ nodesOffline.add("n2");
+
+ TreeSet<String> shards = new TreeSet<String>();
+ shards.add("s1");
+ shards.add("s2");
+ shards.add("s3");
+ shards.add("s4");
+ shards.add("s5");
+
+ DistributedLayoutManager layoutManager1 = new DistributedLayoutManager();
+ layoutManager1.setNodes(nodes);
+ layoutManager1.setShards(shards);
+ layoutManager1.init();
+ Map<String, String> layout1 = layoutManager1.getLayout();
+
+ DistributedLayoutManager layoutManager2 = new DistributedLayoutManager();
+ layoutManager2.setNodes(nodes);
+ layoutManager2.setShards(shards);
+ layoutManager2.setNodesOffline(nodesOffline);
+ layoutManager2.init();
+ Map<String, String> layout2 = layoutManager2.getLayout();
+
+ assertEquals(shards, new TreeSet<String>(layout1.keySet()));
+ assertEquals(nodes, new TreeSet<String>(layout1.values()));
+
+ assertEquals(shards, new TreeSet<String>(layout2.keySet()));
+ TreeSet<String> nodesOnline = new TreeSet<String>(nodes);
+ nodesOnline.removeAll(nodesOffline);
+ assertEquals(nodesOnline, new TreeSet<String>(layout2.values()));
+
+ }
+
+ @Test
+ public void testLayoutManagerPerformance() {
+ DistributedLayoutManager perfTest = new DistributedLayoutManager();
+ perfTest.setNodes(getTestNodes());
+ perfTest.setShards(getTestShards());
+ perfTest.setNodesOffline(getTestOfflineNodes());
+ perfTest.init();
+ int testSize = 100000;
+ for (int i = 0; i < testSize; i++) {
+ perfTest.getLayout();
+ }
+ long s = System.nanoTime();
+ for (int i = 0; i < testSize; i++) {
+ perfTest.getLayout();
+ }
+ long e = System.nanoTime();
+ double ms = (e - s) / 1000000.0;
+ System.out.println("Total " + ms);
+ System.out.println("Per Call " + ms / testSize);
+ assertTrue(ms < 100);
+ }
+
+ private static Collection<String> getTestOfflineNodes() {
+ return Arrays.asList("n13");
+ }
+
+ private static Collection<String> getTestShards() {
+ Collection<String> shards = new HashSet<String>();
+ for (int i = 0; i < 701; i++) {
+ shards.add("s" + i);
+ }
+ return shards;
+ }
+
+ private static Collection<String> getTestNodes() {
+ Collection<String> nodes = new HashSet<String>();
+ for (int i = 0; i < 32; i++) {
+ nodes.add("n" + i);
+ }
+ return nodes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparatorTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparatorTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparatorTest.java
new file mode 100644
index 0000000..02107fd
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparatorTest.java
@@ -0,0 +1,55 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.blur.manager.results.PeekableIterator;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.utils.BlurConstants;
+import org.junit.Test;
+
+
+public class BlurResultPeekableIteratorComparatorTest {
+
+ @Test
+ public void testResultPeekableIteratorComparator() {
+ List<PeekableIterator<BlurResult>> results = new ArrayList<PeekableIterator<BlurResult>>();
+ results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>(Arrays.asList(newResult("5", 5))).iterator()));
+ results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>().iterator()));
+ results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>().iterator()));
+ results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>(Arrays.asList(newResult("2", 2))).iterator()));
+ results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>(Arrays.asList(newResult("1", 1))).iterator()));
+ results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>(Arrays.asList(newResult("9", 1))).iterator()));
+ results.add(new PeekableIterator<BlurResult>(new ArrayList<BlurResult>().iterator()));
+
+ Collections.sort(results, BlurConstants.HITS_PEEKABLE_ITERATOR_COMPARATOR);
+
+ for (PeekableIterator<BlurResult> iterator : results) {
+ System.out.println(iterator.peek());
+ }
+ }
+
+ private BlurResult newResult(String id, double score) {
+ return new BlurResult(id, score, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/results/MultipleBlurResultIterableTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/results/MultipleBlurResultIterableTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/results/MultipleBlurResultIterableTest.java
new file mode 100644
index 0000000..7207eea
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/results/MultipleBlurResultIterableTest.java
@@ -0,0 +1,55 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.manager.results.BlurResultIterableMultiple;
+import org.apache.blur.manager.results.BlurResultIterableSimple;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.junit.Test;
+
+
+public class MultipleBlurResultIterableTest {
+
+ @Test
+ public void testMultipleHitsIterable() {
+ BlurResultIterableMultiple iterable = new BlurResultIterableMultiple();
+ iterable.addBlurResultIterable(newBlurResultIterable(0, 0.1, 3, 2, 9, 10, 2));
+ iterable.addBlurResultIterable(newBlurResultIterable(7, 2, 9, 1, 34, 53, 12));
+ iterable.addBlurResultIterable(newBlurResultIterable(4, 3));
+ iterable.addBlurResultIterable(newBlurResultIterable(7, 2, 34, 132));
+ iterable.addBlurResultIterable(newBlurResultIterable());
+
+ for (BlurResult hit : iterable) {
+ System.out.println(hit);
+ }
+ }
+
+ private BlurResultIterable newBlurResultIterable(double... ds) {
+ List<BlurResult> results = new ArrayList<BlurResult>();
+ for (double d : ds) {
+ results.add(new BlurResult(UUID.randomUUID().toString() + "-" + Double.toString(d), d, null));
+ }
+ return new BlurResultIterableSimple(UUID.randomUUID().toString(), results);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/results/PeekableIteratorTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/results/PeekableIteratorTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/results/PeekableIteratorTest.java
new file mode 100644
index 0000000..f590ef0
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/results/PeekableIteratorTest.java
@@ -0,0 +1,54 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.blur.manager.results.PeekableIterator;
+import org.junit.Test;
+
+
+public class PeekableIteratorTest {
+
+ @Test
+ public void testPeekableIterator1() {
+ PeekableIterator<Integer> iterator = new PeekableIterator<Integer>(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).iterator());
+ while (iterator.hasNext()) {
+ for (int i = 0; i < 3; i++) {
+ System.out.println(iterator.peek());
+ }
+ System.out.println(iterator.next());
+ }
+ }
+
+ @Test
+ public void testPeekableIteratorEmpty() {
+ PeekableIterator<Integer> iterator = new PeekableIterator<Integer>(new ArrayList<Integer>().iterator());
+ for (int i = 0; i < 3; i++) {
+ System.out.println(iterator.peek());
+ }
+ while (iterator.hasNext()) {
+ for (int i = 0; i < 3; i++) {
+ System.out.println(iterator.peek());
+ }
+ System.out.println(iterator.next());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/stats/WeightedAvgTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/stats/WeightedAvgTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/stats/WeightedAvgTest.java
new file mode 100644
index 0000000..84164b1
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/stats/WeightedAvgTest.java
@@ -0,0 +1,40 @@
+package org.apache.blur.manager.stats;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.blur.manager.stats.WeightedAvg;
+import org.junit.Test;
+
+
+public class WeightedAvgTest {
+
+ @Test
+ public void testAvg() {
+ int maxSize = 10;
+ WeightedAvg weightedAvg = new WeightedAvg(maxSize);
+ double total = 0;
+ for (int i = 1; i <= maxSize; i++) {
+ weightedAvg.add(i);
+ total += i;
+ assertTrue(Double.compare(total / i, weightedAvg.getAvg()) == 0);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
new file mode 100644
index 0000000..9e402f0
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
@@ -0,0 +1,154 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.lucene.search.FairSimilarity;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.KeywordAnalyzer;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.store.FSDirectory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class BlurNRTIndexTest {
+
+ private static final int TEST_NUMBER_WAIT_VISIBLE = 500;
+ private static final int TEST_NUMBER = 50000;
+ private BlurNRTIndex writer;
+ private BlurIndexCloser closer;
+ private Random random = new Random();
+ private BlurIndexRefresher refresher;
+ private ExecutorService service;
+ private File base;
+
+ @Before
+ public void setup() throws IOException {
+ base = new File("./tmp/blur-index-writer-test");
+ rm(base);
+ base.mkdirs();
+ closer = new BlurIndexCloser();
+ closer.init();
+
+ Configuration configuration = new Configuration();
+
+ BlurAnalyzer analyzer = new BlurAnalyzer(new KeywordAnalyzer());
+
+ refresher = new BlurIndexRefresher();
+ refresher.init();
+
+ writer = new BlurNRTIndex();
+ writer.setDirectory(FSDirectory.open(new File(base, "index")));
+ writer.setCloser(closer);
+ writer.setAnalyzer(analyzer);
+ writer.setSimilarity(new FairSimilarity());
+ writer.setTable("testing-table");
+ writer.setShard("testing-shard");
+
+ service = Executors.newThreadPool("test", 10);
+ writer.setWalPath(new Path(new File(base, "wal").toURI()));
+
+ writer.setConfiguration(configuration);
+ writer.setTimeBetweenRefreshs(25);
+ writer.init();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ refresher.close();
+ writer.close();
+ closer.close();
+ service.shutdownNow();
+ rm(base);
+ }
+
+ private void rm(File file) {
+ if (!file.exists()) {
+ return;
+ }
+ if (file.isDirectory()) {
+ for (File f : file.listFiles()) {
+ rm(f);
+ }
+ }
+ file.delete();
+ }
+
+ @Test
+ public void testBlurIndexWriter() throws IOException {
+ long s = System.nanoTime();
+ int total = 0;
+ for (int i = 0; i < TEST_NUMBER_WAIT_VISIBLE; i++) {
+ writer.replaceRow(true, true, genRow());
+ IndexReader reader = writer.getIndexReader();
+ assertEquals(i + 1, reader.numDocs());
+ total++;
+ }
+ long e = System.nanoTime();
+ double seconds = (e - s) / 1000000000.0;
+ double rate = total / seconds;
+ System.out.println("Rate " + rate);
+ IndexReader reader = writer.getIndexReader();
+ assertEquals(TEST_NUMBER_WAIT_VISIBLE, reader.numDocs());
+ }
+
+ @Test
+ public void testBlurIndexWriterFaster() throws IOException, InterruptedException {
+ long s = System.nanoTime();
+ int total = 0;
+ for (int i = 0; i < TEST_NUMBER; i++) {
+ writer.replaceRow(false, true, genRow());
+ total++;
+ }
+ long e = System.nanoTime();
+ double seconds = (e - s) / 1000000000.0;
+ double rate = total / seconds;
+ System.out.println("Rate " + rate);
+ writer.refresh();
+ IndexReader reader = writer.getIndexReader();
+ assertEquals(TEST_NUMBER, reader.numDocs());
+ }
+
+ private Row genRow() {
+ Row row = new Row();
+ row.setId(Long.toString(random.nextLong()));
+ Record record = new Record();
+ record.setFamily("testing");
+ record.setRecordId(Long.toString(random.nextLong()));
+ for (int i = 0; i < 10; i++) {
+ record.addToColumns(new Column("col" + i, Long.toString(random.nextLong())));
+ }
+ row.addToRecords(record);
+ return row;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
new file mode 100644
index 0000000..c0999a8
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
@@ -0,0 +1,345 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.blur.manager.writer.DirectoryReferenceCounter;
+import org.apache.blur.manager.writer.DirectoryReferenceFileGC;
+import org.apache.lucene.analysis.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.Version;
+import org.junit.Test;
+
+
+public class DirectoryReferenceCounterTest {
+
+ @Test
+ public void testDirectoryReferenceCounterTestError() throws CorruptIndexException, IOException {
+ Directory directory = wrap(new RAMDirectory());
+ IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_36, new KeywordAnalyzer());
+ IndexWriter writer = new IndexWriter(directory, conf);
+ int size = 100;
+ IndexReader[] readers = new IndexReader[size];
+ for (int i = 0; i < size; i++) {
+ writer.addDocument(getDoc());
+ readers[i] = IndexReader.open(writer, true);
+ writer.forceMerge(1);
+ }
+
+ try {
+ for (int i = 0; i < size; i++) {
+ checkReader(readers[i], i);
+ }
+ fail();
+ } catch (Exception e) {
+ // should error
+ }
+ }
+
+ @Test
+ public void testDirectoryReferenceCounter() throws CorruptIndexException, LockObtainFailedException, IOException, InterruptedException {
+ Directory directory = wrap(new RAMDirectory());
+ DirectoryReferenceFileGC gc = new DirectoryReferenceFileGC();
+ gc.init();
+ DirectoryReferenceCounter counter = new DirectoryReferenceCounter(directory, gc);
+ IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_36, new KeywordAnalyzer());
+ IndexWriter writer = new IndexWriter(counter, conf);
+ int size = 100;
+ IndexReader[] readers = new IndexReader[size];
+ for (int i = 0; i < size; i++) {
+ writer.addDocument(getDoc());
+ writer.forceMerge(1);
+ readers[i] = IndexReader.open(writer, true);
+ }
+
+ for (int i = 0; i < size; i++) {
+ assertEquals(i + 1, readers[i].numDocs());
+ checkReader(readers[i], i);
+ }
+
+ String[] listAll = directory.listAll();
+
+ for (int i = 0; i < size - 1; i++) {
+ readers[i].close();
+ }
+
+ for (int i = 0; i < 1000; i++) {
+ gc.run();
+ Thread.sleep(1);
+ }
+
+ IndexReader last = readers[size - 1];
+
+ assertEquals(100, last.numDocs());
+
+ assertTrue(listAll.length > directory.listAll().length);
+
+ last.close();
+ writer.close();
+ gc.close();
+ }
+
+ private Document getDoc() {
+ Document document = new Document();
+ document.add(new Field("id", "value", Store.YES, Index.NOT_ANALYZED_NO_NORMS));
+ return document;
+ }
+
+ private void checkReader(IndexReader indexReader, int size) throws CorruptIndexException, IOException {
+ for (int i = 0; i < size; i++) {
+ Document document = indexReader.document(i);
+ String value = document.get("id");
+ assertEquals(value, "value");
+ }
+ }
+
+ // This class is use simulate what would happen with a directory that will
+ // forcefully delete files even if they are still in use. e.g. HDFSDirectory
+ public static Directory wrap(final RAMDirectory ramDirectory) {
+ return new Directory() {
+ private Directory d = ramDirectory;
+ private Collection<String> deletedFiles = new LinkedBlockingQueue<String>();
+
+ @SuppressWarnings("deprecation")
+ public void touchFile(String name) throws IOException {
+ d.touchFile(name);
+ }
+
+ public void deleteFile(String name) throws IOException {
+ deletedFiles.add(name);
+ d.deleteFile(name);
+ }
+
+ public IndexOutput createOutput(String name) throws IOException {
+ return d.createOutput(name);
+ }
+
+ @SuppressWarnings("deprecation")
+ public void sync(String name) throws IOException {
+ d.sync(name);
+ }
+
+ public void sync(Collection<String> names) throws IOException {
+ d.sync(names);
+ }
+
+ public IndexInput openInput(String name) throws IOException {
+ return wrap(d.openInput(name), deletedFiles, name);
+ }
+
+ public IndexInput openInput(String name, int bufferSize) throws IOException {
+ return wrap(d.openInput(name, bufferSize), deletedFiles, name);
+ }
+
+ public void clearLock(String name) throws IOException {
+ d.clearLock(name);
+ }
+
+ public void close() throws IOException {
+ d.close();
+ }
+
+ public void setLockFactory(LockFactory lockFactory) throws IOException {
+ d.setLockFactory(lockFactory);
+ }
+
+ public String getLockID() {
+ return d.getLockID();
+ }
+
+ public void copy(Directory to, String src, String dest) throws IOException {
+ d.copy(to, src, dest);
+ }
+
+ public boolean equals(Object arg0) {
+ return d.equals(arg0);
+ }
+
+ public boolean fileExists(String name) throws IOException {
+ return d.fileExists(name);
+ }
+
+ @SuppressWarnings("deprecation")
+ public long fileModified(String name) throws IOException {
+ return d.fileModified(name);
+ }
+
+ public long fileLength(String name) throws IOException {
+ return d.fileLength(name);
+ }
+
+ public LockFactory getLockFactory() {
+ return d.getLockFactory();
+ }
+
+ public int hashCode() {
+ return d.hashCode();
+ }
+
+ public String[] listAll() throws IOException {
+ return d.listAll();
+ }
+
+ public Lock makeLock(String name) {
+ return d.makeLock(name);
+ }
+
+ public String toString() {
+ return d.toString();
+ }
+ };
+ }
+
+ @SuppressWarnings("deprecation")
+ public static IndexInput wrap(final IndexInput input, final Collection<String> deletedFiles, final String name) {
+ return new IndexInput() {
+ private IndexInput in = input;
+
+ private void checkForDeleted() throws IOException {
+ if (deletedFiles.contains(name)) {
+ throw new IOException("File [" + name + "] does not exist");
+ }
+ }
+
+ public void skipChars(int length) throws IOException {
+ checkForDeleted();
+ in.skipChars(length);
+ }
+
+ public void setModifiedUTF8StringsMode() {
+ in.setModifiedUTF8StringsMode();
+ }
+
+ public void close() throws IOException {
+ checkForDeleted();
+ in.close();
+ }
+
+ public short readShort() throws IOException {
+ checkForDeleted();
+ return in.readShort();
+ }
+
+ public void seek(long pos) throws IOException {
+ checkForDeleted();
+ in.seek(pos);
+ }
+
+ public int readInt() throws IOException {
+ checkForDeleted();
+ return in.readInt();
+ }
+
+ public void copyBytes(IndexOutput out, long numBytes) throws IOException {
+ checkForDeleted();
+ in.copyBytes(out, numBytes);
+ }
+
+ public int readVInt() throws IOException {
+ checkForDeleted();
+ return in.readVInt();
+ }
+
+ public String toString() {
+ return in.toString();
+ }
+
+ public long readLong() throws IOException {
+ checkForDeleted();
+ return in.readLong();
+ }
+
+ public long readVLong() throws IOException {
+ checkForDeleted();
+ return in.readVLong();
+ }
+
+ public String readString() throws IOException {
+ checkForDeleted();
+ return in.readString();
+ }
+
+ public Object clone() {
+ return super.clone();
+ }
+
+ public boolean equals(Object obj) {
+ return in.equals(obj);
+ }
+
+ public long getFilePointer() {
+ return in.getFilePointer();
+ }
+
+ public int hashCode() {
+ return in.hashCode();
+ }
+
+ public byte readByte() throws IOException {
+ checkForDeleted();
+ return in.readByte();
+ }
+
+ public void readBytes(byte[] b, int offset, int len) throws IOException {
+ checkForDeleted();
+ in.readBytes(b, offset, len);
+ }
+
+ public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
+ checkForDeleted();
+ in.readBytes(b, offset, len, useBuffer);
+ }
+
+ public long length() {
+ return in.length();
+ }
+
+ public void readChars(char[] buffer, int start, int length) throws IOException {
+ checkForDeleted();
+ in.readChars(buffer, start, length);
+ }
+
+ public Map<String, String> readStringStringMap() throws IOException {
+ checkForDeleted();
+ return in.readStringStringMap();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java b/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
new file mode 100644
index 0000000..06180dc
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
@@ -0,0 +1,105 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.index.IndexWriter;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.KeywordAnalyzer;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.Version;
+import org.junit.Test;
+
+
+public class TransactionRecorderTest {
+
+ @Test
+ public void testReplay() throws IOException {
+ String tmpPath = "./tmp/transaction-recorder/wal";
+ rm(new File(tmpPath));
+
+ KeywordAnalyzer analyzer = new KeywordAnalyzer();
+ Configuration configuration = new Configuration();
+ BlurAnalyzer blurAnalyzer = new BlurAnalyzer(analyzer);
+
+ TransactionRecorder transactionRecorder = new TransactionRecorder();
+ transactionRecorder.setAnalyzer(blurAnalyzer);
+ transactionRecorder.setConfiguration(configuration);
+
+ transactionRecorder.setWalPath(new Path(tmpPath));
+ transactionRecorder.init();
+ transactionRecorder.open();
+ try {
+ transactionRecorder.replaceRow(true, genRow(), null);
+ fail("Should NPE");
+ } catch (NullPointerException e) {
+ }
+ transactionRecorder.close(); // this is done so that the rawfs will flush
+ // the file to disk for reading
+
+ RAMDirectory directory = new RAMDirectory();
+ IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, analyzer);
+ IndexWriter writer = new IndexWriter(directory, conf);
+
+ TransactionRecorder replayTransactionRecorder = new TransactionRecorder();
+ replayTransactionRecorder.setAnalyzer(blurAnalyzer);
+ replayTransactionRecorder.setConfiguration(configuration);
+ replayTransactionRecorder.setWalPath(new Path(tmpPath));
+ replayTransactionRecorder.init();
+
+ replayTransactionRecorder.replay(writer);
+ IndexReader reader = IndexReader.open(directory);
+ assertEquals(1, reader.numDocs());
+ }
+
+ private void rm(File file) {
+ if (!file.exists()) {
+ return;
+ }
+ if (file.isDirectory()) {
+ for (File f : file.listFiles()) {
+ rm(f);
+ }
+ }
+ file.delete();
+ }
+
+ private Row genRow() {
+ Row row = new Row();
+ row.id = "1";
+ Record record = new Record();
+ record.recordId = "1";
+ record.family = "test";
+ record.addToColumns(new Column("name", "value"));
+ row.addToRecords(record);
+ return row;
+ }
+
+}
|