phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mujt...@apache.org
Subject [02/15] Rename package from org.apache.hadoop.hbase.index.* to org.apache.phoenix.index.* to fix classloader issue causing mutable index performance regression - https://issues.apache.org/jira/browse/PHOENIX-38
Date Sat, 15 Feb 2014 00:07:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestFailWithoutRetries.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestFailWithoutRetries.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestFailWithoutRetries.java
new file mode 100644
index 0000000..8d9d65e
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestFailWithoutRetries.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.example;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.phoenix.hbase.index.IndexTestingUtils;
+import org.apache.phoenix.hbase.index.TableName;
+import org.apache.phoenix.hbase.index.covered.IndexUpdate;
+import org.apache.phoenix.hbase.index.covered.TableState;
+import org.apache.phoenix.hbase.index.covered.example.ColumnGroup;
+import org.apache.phoenix.hbase.index.covered.example.CoveredColumn;
+import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexSpecifierBuilder;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.index.BaseIndexCodec;
+
+/**
+ * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String}
+ * constructor), {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize
+ * the exception, and just return <tt>null</tt> to the client, which then just goes and retries.
+ */
+public class TestFailWithoutRetries {
+
+  private static final Log LOG = LogFactory.getLog(TestFailWithoutRetries.class);
+  @Rule
+  public TableName table = new TableName();
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private String getIndexTableName() {
+    return Bytes.toString(table.getTableName()) + "_index";
+  }
+
+  public static class FailingTestCodec extends BaseIndexCodec {
+
+    @Override
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
+      throw new RuntimeException("Intentionally failing deletes for "
+          + TestFailWithoutRetries.class.getName());
+    }
+
+    @Override
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
+      throw new RuntimeException("Intentionally failing upserts for "
+          + TestFailWithoutRetries.class.getName());
+    }
+
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    // setup and verify the config
+    Configuration conf = UTIL.getConfiguration();
+    IndexTestingUtils.setupConfig(conf);
+    IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
+    // start the cluster
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't
+   * rethrowing the exception correctly?
+   * <p>
+   * We use a custom codec to enforce the thrown exception.
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testQuickFailure() throws Exception {
+    // incorrectly setup indexing for the primary table - target index table doesn't exist, which
+    // should quickly return to the client
+    byte[] family = Bytes.toBytes("family");
+    ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
+    // values are [col1]
+    fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
+    CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
+    // add the index family
+    builder.addIndexGroup(fam1);
+    // usually, we would create the index table here, but we don't for the sake of the test.
+
+    // setup the primary table
+    String primaryTable = Bytes.toString(table.getTableName());
+    HTableDescriptor pTable = new HTableDescriptor(primaryTable);
+    pTable.addFamily(new HColumnDescriptor(family));
+    // override the codec so we can use our test one
+    builder.build(pTable, FailingTestCodec.class);
+
+    // create the primary table
+    HBaseAdmin admin = UTIL.getHBaseAdmin();
+    admin.createTable(pTable);
+    Configuration conf = new Configuration(UTIL.getConfiguration());
+    // up the number of retries/wait time to make it obvious that we are failing with retries here
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
+    conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
+    HTable primary = new HTable(conf, primaryTable);
+    primary.setAutoFlush(false, true);
+
+    // do a simple put that should be indexed
+    Put p = new Put(Bytes.toBytes("row"));
+    p.add(family, null, Bytes.toBytes("value"));
+    primary.put(p);
+    try {
+      primary.flushCommits();
+      fail("Shouldn't have gotten a successful write to the primary table");
+    } catch (RetriesExhaustedWithDetailsException e) {
+      LOG.info("Correclty got a failure of the put!");
+    }
+    primary.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestApplyAndFilterDeletesFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestApplyAndFilterDeletesFilter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestApplyAndFilterDeletesFilter.java
new file mode 100644
index 0000000..d57f36a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestApplyAndFilterDeletesFilter.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.filter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Test filter to ensure that it correctly handles KVs of different types correctly
+ */
+public class TestApplyAndFilterDeletesFilter {
+
+  private static final Set<ImmutableBytesPtr> EMPTY_SET = Collections
+      .<ImmutableBytesPtr> emptySet();
+  private byte[] row = Bytes.toBytes("row");
+  private byte[] family = Bytes.toBytes("family");
+  private byte[] qualifier = Bytes.toBytes("qualifier");
+  private byte[] value = Bytes.toBytes("value");
+  private long ts = 10;
+
+  @Test
+  public void testDeletesAreNotReturned() {
+    KeyValue kv = createKvForType(Type.Delete);
+    ApplyAndFilterDeletesFilter filter = new ApplyAndFilterDeletesFilter(EMPTY_SET);
+    assertEquals("Didn't skip point delete!", ReturnCode.SKIP, filter.filterKeyValue(kv));
+
+    filter.reset();
+    kv = createKvForType(Type.DeleteColumn);
+    assertEquals("Didn't skip from column delete!", ReturnCode.SKIP, filter.filterKeyValue(kv));
+
+    filter.reset();
+    kv = createKvForType(Type.DeleteFamily);
+    assertEquals("Didn't skip from family delete!", ReturnCode.SKIP, filter.filterKeyValue(kv));
+  }
+
+  /**
+   * Hinting with this filter is a little convoluted as we binary search the list of families to
+   * attempt to find the right one to seek.
+   */
+  @Test
+  public void testHintCorrectlyToNextFamily() {
+    // start with doing a family delete, so we will seek to the next column
+    KeyValue kv = createKvForType(Type.DeleteFamily);
+    ApplyAndFilterDeletesFilter filter = new ApplyAndFilterDeletesFilter(EMPTY_SET);
+    assertEquals(ReturnCode.SKIP, filter.filterKeyValue(kv));
+    KeyValue next = createKvForType(Type.Put);
+    // make sure the hint is our attempt at the end key, because we have no more families to seek
+    assertEquals("Didn't get a hint from a family delete", ReturnCode.SEEK_NEXT_USING_HINT,
+      filter.filterKeyValue(next));
+    assertEquals("Didn't get END_KEY with no families to match", KeyValue.LOWESTKEY,
+      filter.getNextKeyHint(next));
+
+    // check for a family that comes before our family, so we always seek to the end as well
+    filter = new ApplyAndFilterDeletesFilter(asSet(Bytes.toBytes("afamily")));
+    assertEquals(ReturnCode.SKIP, filter.filterKeyValue(kv));
+    // make sure the hint is our attempt at the end key, because we have no more families to seek
+    assertEquals("Didn't get a hint from a family delete", ReturnCode.SEEK_NEXT_USING_HINT,
+      filter.filterKeyValue(next));
+    assertEquals("Didn't get END_KEY with no families to match", KeyValue.LOWESTKEY,
+      filter.getNextKeyHint(next));
+
+    // check that we seek to the correct family that comes after our family
+    byte[] laterFamily = Bytes.toBytes("zfamily");
+    filter = new ApplyAndFilterDeletesFilter(asSet(laterFamily));
+    assertEquals(ReturnCode.SKIP, filter.filterKeyValue(kv));
+    KeyValue expected = KeyValue.createFirstOnRow(kv.getRow(), laterFamily, new byte[0]);
+    assertEquals("Didn't get a hint from a family delete", ReturnCode.SEEK_NEXT_USING_HINT,
+      filter.filterKeyValue(next));
+    assertEquals("Didn't get correct next key with a next family", expected,
+      filter.getNextKeyHint(next));
+  }
+
+  /**
+   * Point deletes should only cover the exact entry they are tied to. Earlier puts should always
+   * show up.
+   */
+  @Test
+  public void testCoveringPointDelete() {
+    // start with doing a family delete, so we will seek to the next column
+    KeyValue kv = createKvForType(Type.Delete);
+    ApplyAndFilterDeletesFilter filter = new ApplyAndFilterDeletesFilter(EMPTY_SET);
+    filter.filterKeyValue(kv);
+    KeyValue put = createKvForType(Type.Put);
+    assertEquals("Didn't filter out put with same timestamp!", ReturnCode.SKIP,
+      filter.filterKeyValue(put));
+    // we should filter out the exact same put again, which could occur with the kvs all kept in the
+    // same memstore
+    assertEquals("Didn't filter out put with same timestamp on second call!", ReturnCode.SKIP,
+      filter.filterKeyValue(put));
+
+    // ensure then that we don't filter out a put with an earlier timestamp (though everything else
+    // matches)
+    put = createKvForType(Type.Put, ts - 1);
+    assertEquals("Didn't accept put that has an earlier ts than the covering delete!",
+      ReturnCode.INCLUDE, filter.filterKeyValue(put));
+  }
+
+  private KeyValue createKvForType(Type t) {
+    return createKvForType(t, this.ts);
+  }
+
+  private KeyValue createKvForType(Type t, long timestamp) {
+    return new KeyValue(row, family, qualifier, 0, qualifier.length, timestamp, t, value, 0,
+        value.length);
+  }
+
+  /**
+   * Test that when we do a column delete at a given timestamp that we delete the entire column.
+   * @throws Exception
+   */
+  @Test
+  public void testCoverForDeleteColumn() throws Exception {
+    ApplyAndFilterDeletesFilter filter = new ApplyAndFilterDeletesFilter(EMPTY_SET);
+    KeyValue dc = createKvForType(Type.DeleteColumn, 11);
+    KeyValue put = createKvForType(Type.Put, 10);
+    assertEquals("Didn't filter out delete column.", ReturnCode.SKIP, filter.filterKeyValue(dc));
+    assertEquals("Didn't get a seek hint for the deleted column", ReturnCode.SEEK_NEXT_USING_HINT,
+      filter.filterKeyValue(put));
+    // seek past the given put
+    KeyValue seek = filter.getNextKeyHint(put);
+    assertTrue("Seeked key wasn't past the expected put - didn't skip the column",
+      KeyValue.COMPARATOR.compare(seek, put) > 0);
+  }
+
+  /**
+   * DeleteFamily markers should delete everything from that timestamp backwards, but not hide
+   * anything forwards
+   */
+  @Test
+  public void testDeleteFamilyCorrectlyCoversColumns() {
+    ApplyAndFilterDeletesFilter filter = new ApplyAndFilterDeletesFilter(EMPTY_SET);
+    KeyValue df = createKvForType(Type.DeleteFamily, 11);
+    KeyValue put = createKvForType(Type.Put, 12);
+
+    assertEquals("Didn't filter out delete family", ReturnCode.SKIP, filter.filterKeyValue(df));
+    assertEquals("Filtered out put with newer TS than delete family", ReturnCode.INCLUDE,
+      filter.filterKeyValue(put));
+
+    // older kv shouldn't be visible
+    put = createKvForType(Type.Put, 10);
+    assertEquals("Didn't filter out older put, covered by DeleteFamily marker",
+      ReturnCode.SEEK_NEXT_USING_HINT, filter.filterKeyValue(put));
+
+    // next seek should be past the families
+    assertEquals(KeyValue.LOWESTKEY, filter.getNextKeyHint(put));
+  }
+
+  /**
+   * Test that we don't cover other columns when we have a delete column.
+   */
+  @Test
+  public void testDeleteColumnCorrectlyCoversColumns() {
+    ApplyAndFilterDeletesFilter filter = new ApplyAndFilterDeletesFilter(EMPTY_SET);
+    KeyValue d = createKvForType(Type.DeleteColumn, 12);
+    byte[] qual2 = Bytes.add(qualifier, Bytes.toBytes("-other"));
+    KeyValue put =
+        new KeyValue(row, family, qual2, 0, qual2.length, 11, Type.Put, value, 0,
+            value.length);
+
+    assertEquals("Didn't filter out delete column", ReturnCode.SKIP, filter.filterKeyValue(d));
+    // different column put should still be visible
+    assertEquals("Filtered out put with different column than the delete", ReturnCode.INCLUDE,
+      filter.filterKeyValue(put));
+
+    // set a delete family, but in the past
+    d = createKvForType(Type.DeleteFamily, 10);
+    assertEquals("Didn't filter out delete column", ReturnCode.SKIP, filter.filterKeyValue(d));
+    // add back in the original delete column
+    d = createKvForType(Type.DeleteColumn, 11);
+    assertEquals("Didn't filter out delete column", ReturnCode.SKIP, filter.filterKeyValue(d));
+    // onto a different family, so that must be visible too
+    assertEquals("Filtered out put with different column than the delete", ReturnCode.INCLUDE,
+      filter.filterKeyValue(put));
+  }
+
+  private static Set<ImmutableBytesPtr> asSet(byte[]... strings) {
+    Set<ImmutableBytesPtr> set = new HashSet<ImmutableBytesPtr>();
+    for (byte[] s : strings) {
+      set.add(new ImmutableBytesPtr(s));
+    }
+    return set;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java
new file mode 100644
index 0000000..216f548
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.filter;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.covered.filter.FamilyOnlyFilter;
+import org.junit.Test;
+
+/**
+ * Test that the family only filter only allows a single family through
+ */
+public class TestFamilyOnlyFilter {
+
+  byte[] row = new byte[] { 'a' };
+  byte[] qual = new byte[] { 'b' };
+  byte[] val = Bytes.toBytes("val");
+
+  @Test
+  public void testPassesFirstFamily() {
+    byte[] fam = Bytes.toBytes("fam");
+    byte[] fam2 = Bytes.toBytes("fam2");
+
+    FamilyOnlyFilter filter = new FamilyOnlyFilter(fam);
+
+    KeyValue kv = new KeyValue(row, fam, qual, 10, val);
+    ReturnCode code = filter.filterKeyValue(kv);
+    assertEquals("Didn't pass matching family!", ReturnCode.INCLUDE, code);
+
+    kv = new KeyValue(row, fam2, qual, 10, val);
+    code = filter.filterKeyValue(kv);
+    assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code);
+  }
+
+  @Test
+  public void testPassesTargetFamilyAsNonFirstFamily() {
+    byte[] fam = Bytes.toBytes("fam");
+    byte[] fam2 = Bytes.toBytes("fam2");
+    byte[] fam3 = Bytes.toBytes("way_after_family");
+
+    FamilyOnlyFilter filter = new FamilyOnlyFilter(fam2);
+
+    KeyValue kv = new KeyValue(row, fam, qual, 10, val);
+
+    ReturnCode code = filter.filterKeyValue(kv);
+    assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code);
+
+    kv = new KeyValue(row, fam2, qual, 10, val);
+    code = filter.filterKeyValue(kv);
+    assertEquals("Didn't pass matching family", ReturnCode.INCLUDE, code);
+
+    kv = new KeyValue(row, fam3, qual, 10, val);
+    code = filter.filterKeyValue(kv);
+    assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code);
+  }
+
+  @Test
+  public void testResetFilter() {
+    byte[] fam = Bytes.toBytes("fam");
+    byte[] fam2 = Bytes.toBytes("fam2");
+    byte[] fam3 = Bytes.toBytes("way_after_family");
+
+    FamilyOnlyFilter filter = new FamilyOnlyFilter(fam2);
+
+    KeyValue kv = new KeyValue(row, fam, qual, 10, val);
+
+    ReturnCode code = filter.filterKeyValue(kv);
+    assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code);
+
+    KeyValue accept = new KeyValue(row, fam2, qual, 10, val);
+    code = filter.filterKeyValue(accept);
+    assertEquals("Didn't pass matching family", ReturnCode.INCLUDE, code);
+
+    kv = new KeyValue(row, fam3, qual, 10, val);
+    code = filter.filterKeyValue(kv);
+    assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code);
+
+    // we shouldn't match the family again - everything after a switched family should be ignored
+    code = filter.filterKeyValue(accept);
+    assertEquals("Should have skipped a 'matching' family if it arrives out of order",
+      ReturnCode.SKIP, code);
+
+    // reset the filter and we should accept it again
+    filter.reset();
+    code = filter.filterKeyValue(accept);
+    assertEquals("Didn't pass matching family after reset", ReturnCode.INCLUDE, code);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestNewerTimestampFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestNewerTimestampFilter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestNewerTimestampFilter.java
new file mode 100644
index 0000000..0677a38
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestNewerTimestampFilter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.filter;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.covered.filter.NewerTimestampFilter;
+import org.junit.Test;
+
+public class TestNewerTimestampFilter {
+  byte[] row = new byte[] { 'a' };
+  byte[] fam = Bytes.toBytes("family");
+  byte[] qual = new byte[] { 'b' };
+  byte[] val = Bytes.toBytes("val");
+
+  @Test
+  public void testOnlyAllowsOlderTimestamps() {
+    long ts = 100;
+    NewerTimestampFilter filter = new NewerTimestampFilter(ts);
+
+    KeyValue kv = new KeyValue(row, fam, qual, ts, val);
+    assertEquals("Didn't accept kv with matching ts", ReturnCode.INCLUDE, filter.filterKeyValue(kv));
+
+    kv = new KeyValue(row, fam, qual, ts + 1, val);
+    assertEquals("Didn't skip kv with greater ts", ReturnCode.SKIP, filter.filterKeyValue(kv));
+
+    kv = new KeyValue(row, fam, qual, ts - 1, val);
+    assertEquals("Didn't accept kv with lower ts", ReturnCode.INCLUDE, filter.filterKeyValue(kv));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
new file mode 100644
index 0000000..a0592f3
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.update;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+
+import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
+
+public class TestIndexUpdateManager {
+
+  private static final byte[] row = Bytes.toBytes("row");
+  private static final String TABLE_NAME = "table";
+  private static final byte[] table = Bytes.toBytes(TABLE_NAME);
+
+  @Test
+  public void testMutationComparator() throws Exception {
+    IndexUpdateManager manager = new IndexUpdateManager();
+    Comparator<Mutation> comparator = manager.COMPARATOR;
+    Put p = new Put(row, 10);
+    // lexigraphically earlier should sort earlier
+    Put p1 = new Put(Bytes.toBytes("ro"), 10);
+    assertTrue("lexigraphically later sorting first, should be earlier first.",
+      comparator.compare(p, p1) > 0);
+    p1 = new Put(Bytes.toBytes("row1"), 10);
+    assertTrue("lexigraphically later sorting first, should be earlier first.",
+      comparator.compare(p1, p) > 0);
+
+    // larger ts sorts before smaller, for the same row
+    p1 = new Put(row, 11);
+    assertTrue("Smaller timestamp sorting first, should be larger first.",
+      comparator.compare(p, p1) > 0);
+    // still true, even for deletes
+    Delete d = new Delete(row, 11);
+    assertTrue("Smaller timestamp sorting first, should be larger first.",
+      comparator.compare(p, d) > 0);
+
+    // for the same row, t1, the delete should sort earlier
+    d = new Delete(row, 10);
+    assertTrue("Delete doesn't sort before put, for the same row and ts",
+      comparator.compare(p, d) > 0);
+
+    // but for different rows, we still respect the row sorting.
+    d = new Delete(Bytes.toBytes("row1"), 10);
+    assertTrue("Delete doesn't sort before put, for the same row and ts",
+      comparator.compare(p, d) < 0);
+  }
+
+  /**
+   * When making updates we need to cancel out {@link Delete} and {@link Put}s for the same row.
+   * @throws Exception on failure
+   */
+  @Test
+  public void testCancelingUpdates() throws Exception {
+    IndexUpdateManager manager = new IndexUpdateManager();
+
+    long ts1 = 10, ts2 = 11;
+    // at different timestamps, so both should be retained
+    Delete d = new Delete(row, ts1);
+    Put p = new Put(row, ts2);
+    manager.addIndexUpdate(table, d);
+    manager.addIndexUpdate(table, p);
+    List<Mutation> pending = new ArrayList<Mutation>();
+    pending.add(p);
+    pending.add(d);
+    validate(manager, pending);
+
+    // add a delete that should cancel out the put, leading to only one delete remaining
+    Delete d2 = new Delete(row, ts2);
+    manager.addIndexUpdate(table, d2);
+    pending.add(d);
+    validate(manager, pending);
+
+    // double-deletes of the same row only retain the existing one, which was already canceled out
+    // above
+    Delete d3 = new Delete(row, ts2);
+    manager.addIndexUpdate(table, d3);
+    pending.add(d);
+    validate(manager, pending);
+
+    // if there is just a put and a delete at the same ts, no pending updates should be returned
+    manager = new IndexUpdateManager();
+    manager.addIndexUpdate(table, d2);
+    manager.addIndexUpdate(table, p);
+    validate(manager, Collections.<Mutation> emptyList());
+
+    // different row insertions can be tricky too, if you don't get the base cases right
+    manager = new IndexUpdateManager();
+    manager.addIndexUpdate(table, p);
+    // this row definitely sorts after the current row
+    byte[] row1 = Bytes.toBytes("row1");
+    Put p1 = new Put(row1, ts1);
+    manager.addIndexUpdate(table, p1);
+    // this delete should completely cover the given put and both should be removed
+    Delete d4 = new Delete(row1, ts1);
+    manager.addIndexUpdate(table, d4);
+    pending.clear();
+    pending.add(p);
+    validate(manager, pending);
+  }
+
+  private void validate(IndexUpdateManager manager, List<Mutation> pending) {
+    for (Pair<Mutation, byte[]> entry : manager.toMap()) {
+      assertEquals("Table name didn't match for stored entry!", table, entry.getSecond());
+      Mutation m = pending.remove(0);
+      // test with == to match the exact entries, Mutation.equals just checks the row
+      assertTrue(
+        "Didn't get the expected mutation! Expected: " + m + ", but got: " + entry.getFirst(),
+        m == entry.getFirst());
+    }
+    assertTrue("Missing pending updates: " + pending, pending.isEmpty());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestThreadPoolBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestThreadPoolBuilder.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestThreadPoolBuilder.java
new file mode 100644
index 0000000..5ff7b8b
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestThreadPoolBuilder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.phoenix.hbase.index.TableName;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+
+public class TestThreadPoolBuilder {
+
+  @Rule
+  public TableName name = new TableName();
+
+  @Test
+  public void testCoreThreadTimeoutNonZero() {
+    Configuration conf = new Configuration(false);
+    String key = name.getTableNameString()+"-key";
+    ThreadPoolBuilder builder = new ThreadPoolBuilder(name.getTableNameString(), conf);
+    assertTrue("core threads not set, but failed return", builder.getKeepAliveTime() > 0);
+    // set an negative value
+    builder.setCoreTimeout(key, -1);
+    assertTrue("core threads not set, but failed return", builder.getKeepAliveTime() > 0);
+    // set a positive value
+    builder.setCoreTimeout(key, 1234);
+    assertEquals("core threads not set, but failed return", 1234, builder.getKeepAliveTime());
+    // set an empty value
+    builder.setCoreTimeout(key);
+    assertTrue("core threads not set, but failed return", builder.getKeepAliveTime() > 0);
+  }
+  
+  @Test
+  public void testMaxThreadsNonZero() {
+    Configuration conf = new Configuration(false);
+    String key = name.getTableNameString()+"-key";
+    ThreadPoolBuilder builder = new ThreadPoolBuilder(name.getTableNameString(), conf);
+    assertTrue("core threads not set, but failed return", builder.getMaxThreads() > 0);
+    // set an negative value
+    builder.setMaxThread(key, -1);
+    assertTrue("core threads not set, but failed return", builder.getMaxThreads() > 0);
+    // set a positive value
+    builder.setMaxThread(key, 1234);
+    assertEquals("core threads not set, but failed return", 1234, builder.getMaxThreads());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestThreadPoolManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestThreadPoolManager.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestThreadPoolManager.java
new file mode 100644
index 0000000..24c30ac
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestThreadPoolManager.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.phoenix.hbase.index.TableName;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+
+public class TestThreadPoolManager {
+
+  @Rule
+  public TableName name = new TableName();
+
+  @Test
+  public void testShutdownGetsNewThreadPool() throws Exception{
+    Map<String, Object> cache = new HashMap<String, Object>();
+    ThreadPoolBuilder builder = new ThreadPoolBuilder(name.getTableNameString(), new Configuration(false));
+    ThreadPoolExecutor exec = ThreadPoolManager.getExecutor(builder, cache);
+    assertNotNull("Got a null exector from the pool!", exec);
+    //shutdown the pool and ensure that it actually shutdown
+    exec.shutdown();
+    ThreadPoolExecutor exec2 = ThreadPoolManager.getExecutor(builder, cache);
+    assertFalse("Got the same exectuor, even though the original shutdown", exec2 == exec);
+  }
+
+  @Test
+  public void testShutdownWithReferencesDoesNotStopExecutor() throws Exception {
+    Map<String, Object> cache = new HashMap<String, Object>();
+    ThreadPoolBuilder builder =
+        new ThreadPoolBuilder(name.getTableNameString(), new Configuration(false));
+    ThreadPoolExecutor exec = ThreadPoolManager.getExecutor(builder, cache);
+    assertNotNull("Got a null exector from the pool!", exec);
+    ThreadPoolExecutor exec2 = ThreadPoolManager.getExecutor(builder, cache);
+    assertTrue("Should have gotten the same executor", exec2 == exec);
+    exec.shutdown();
+    assertFalse("Executor is shutting down, even though we have a live reference!",
+      exec.isShutdown() || exec.isTerminating());
+    exec2.shutdown();
+    // wait 5 minutes for thread pool to shutdown
+    assertTrue("Executor is NOT shutting down, after releasing live reference!",
+      exec.awaitTermination(300, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testGetExpectedExecutorForName() throws Exception {
+    Map<String, Object> cache = new HashMap<String, Object>();
+    ThreadPoolBuilder builder =
+        new ThreadPoolBuilder(name.getTableNameString(), new Configuration(false));
+    ThreadPoolExecutor exec = ThreadPoolManager.getExecutor(builder, cache);
+    assertNotNull("Got a null exector from the pool!", exec);
+    ThreadPoolExecutor exec2 = ThreadPoolManager.getExecutor(builder, cache);
+    assertTrue("Got a different exectuor, even though they have the same name", exec2 == exec);
+    builder = new ThreadPoolBuilder(name.getTableNameString(), new Configuration(false));
+    exec2 = ThreadPoolManager.getExecutor(builder, cache);
+    assertTrue(
+      "Got a different exectuor, even though they have the same name, but different confs",
+      exec2 == exec);
+
+    builder =
+        new ThreadPoolBuilder(name.getTableNameString() + "-some-other-pool", new Configuration(
+            false));
+    exec2 = ThreadPoolManager.getExecutor(builder, cache);
+    assertFalse(
+      "Got a different exectuor, even though they have the same name, but different confs",
+      exec2 == exec);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/util/TestIndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/util/TestIndexManagementUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/util/TestIndexManagementUtil.java
new file mode 100644
index 0000000..ef46ff4
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/util/TestIndexManagementUtil.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.wal.IndexedHLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec;
+import org.apache.hadoop.hbase.regionserver.wal.WALEditCodec;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.junit.Test;
+
+public class TestIndexManagementUtil {
+
+  @Test
+  public void testUncompressedWal() throws Exception {
+    Configuration conf = new Configuration(false);
+    // works with WALEditcodec
+    conf.set(WALEditCodec.WAL_EDIT_CODEC_CLASS_KEY, IndexedWALEditCodec.class.getName());
+    IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
+    // clear the codec and set the wal reader
+    conf = new Configuration(false);
+    conf.set(IndexManagementUtil.HLOG_READER_IMPL_KEY, IndexedHLogReader.class.getName());
+    IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
+  }
+
+  /**
+   * Compressed WALs are supported when we have the WALEditCodec installed
+   * @throws Exception
+   */
+  @Test
+  public void testCompressedWALWithCodec() throws Exception {
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    // works with WALEditcodec
+    conf.set(WALEditCodec.WAL_EDIT_CODEC_CLASS_KEY, IndexedWALEditCodec.class.getName());
+    IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
+  }
+
+  /**
+   * We cannot support WAL Compression with the IndexedHLogReader
+   * @throws Exception
+   */
+  @Test(expected = IllegalStateException.class)
+  public void testCompressedWALWithHLogReader() throws Exception {
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    // works with WALEditcodec
+    conf.set(IndexManagementUtil.HLOG_READER_IMPL_KEY, IndexedHLogReader.class.getName());
+    IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java
new file mode 100644
index 0000000..2b6be18
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Simple table factory that just looks up the tables based on name. Useful for mocking up
+ * {@link HTableInterface}s without having to mock up the factory too.
+ */
+class FakeTableFactory implements HTableFactory {
+
+  boolean shutdown = false;
+  private Map<ImmutableBytesPtr, HTableInterface> tables;
+
+  public FakeTableFactory(Map<ImmutableBytesPtr, HTableInterface> tables) {
+    this.tables = tables;
+  }
+
+  @Override
+  public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+    return this.tables.get(tablename);
+  }
+
+  @Override
+  public void shutdown() {
+    shutdown = true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java
new file mode 100644
index 0000000..adf82f3
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.phoenix.hbase.index.table.CachingHTableFactory;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+public class TestCachingHTableFactory {
+
+  @Test
+  public void testCacheCorrectlyExpiresTable() throws Exception {
+    // setup the mocks for the tables we will request
+    HTableFactory delegate = Mockito.mock(HTableFactory.class);
+    ImmutableBytesPtr t1 = new ImmutableBytesPtr(Bytes.toBytes("t1"));
+    ImmutableBytesPtr t2 = new ImmutableBytesPtr(Bytes.toBytes("t2"));
+    ImmutableBytesPtr t3 = new ImmutableBytesPtr(Bytes.toBytes("t3"));
+    HTableInterface table1 = Mockito.mock(HTableInterface.class);
+    HTableInterface table2 = Mockito.mock(HTableInterface.class);
+    HTableInterface table3 = Mockito.mock(HTableInterface.class);
+    Mockito.when(delegate.getTable(t1)).thenReturn(table1);
+    Mockito.when(delegate.getTable(t2)).thenReturn(table2);
+    Mockito.when(delegate.getTable(t3)).thenReturn(table3);
+    
+    // setup our factory with a cache size of 2
+    CachingHTableFactory factory = new CachingHTableFactory(delegate, 2);
+    factory.getTable(t1);
+    factory.getTable(t2);
+    factory.getTable(t3);
+    // get the same table a second time, after it has gone out of cache
+    factory.getTable(t1);
+    
+    Mockito.verify(delegate, Mockito.times(2)).getTable(t1);
+    Mockito.verify(delegate, Mockito.times(1)).getTable(t2);
+    Mockito.verify(delegate, Mockito.times(1)).getTable(t3);
+    Mockito.verify(table1).close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
new file mode 100644
index 0000000..56a9ab8
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.phoenix.hbase.index.StubAbortable;
+import org.apache.phoenix.hbase.index.TableName;
+import org.apache.phoenix.hbase.index.exception.IndexWriteException;
+import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
+import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
+
+public class TestIndexWriter {
+  private static final Log LOG = LogFactory.getLog(TestIndexWriter.class);
+  @Rule
+  public TableName testName = new TableName();
+  private final byte[] row = Bytes.toBytes("row");
+
+  @Test
+  public void getDefaultWriter() throws Exception {
+    Configuration conf = new Configuration(false);
+    RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class);
+    Mockito.when(env.getConfiguration()).thenReturn(conf);
+    assertNotNull(IndexWriter.getCommitter(env));
+  }
+
+  @Test
+  public void getDefaultFailurePolicy() throws Exception {
+    Configuration conf = new Configuration(false);
+    RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class);
+    Mockito.when(env.getConfiguration()).thenReturn(conf);
+    assertNotNull(IndexWriter.getFailurePolicy(env));
+  }
+
+  /**
+   * With the move to using a pool of threads to write, we need to ensure that we still block until
+   * all index writes for a mutation/batch are completed.
+   * @throws Exception on failure
+   */
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testSynchronouslyCompletesAllWrites() throws Exception {
+    LOG.info("Starting " + testName.getTableNameString());
+    LOG.info("Current thread is interrupted: " + Thread.interrupted());
+    Abortable abort = new StubAbortable();
+    Stoppable stop = Mockito.mock(Stoppable.class);
+    ExecutorService exec = Executors.newFixedThreadPool(1);
+    Map<ImmutableBytesPtr, HTableInterface> tables = new HashMap<ImmutableBytesPtr, HTableInterface>();
+    FakeTableFactory factory = new FakeTableFactory(tables);
+
+    byte[] tableName = this.testName.getTableName();
+    Put m = new Put(row);
+    m.add(Bytes.toBytes("family"), Bytes.toBytes("qual"), null);
+    Collection<Pair<Mutation, byte[]>> indexUpdates = Arrays.asList(new Pair<Mutation, byte[]>(m,
+        tableName));
+
+    HTableInterface table = Mockito.mock(HTableInterface.class);
+    final boolean[] completed = new boolean[] { false };
+    Mockito.when(table.batch(Mockito.anyList())).thenAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        // just keep track that it was called
+        completed[0] = true;
+        return null;
+      }
+    });
+    Mockito.when(table.getTableName()).thenReturn(testName.getTableName());
+    // add the table to the set of tables, so its returned to the writer
+    tables.put(new ImmutableBytesPtr(tableName), table);
+
+    // setup the writer and failure policy
+    ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter();
+    committer.setup(factory, exec, abort, stop, 2);
+    KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
+    policy.setup(stop, abort);
+    IndexWriter writer = new IndexWriter(committer, policy);
+    writer.write(indexUpdates);
+    assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",
+      completed[0]);
+    writer.stop(this.testName.getTableNameString() + " finished");
+    assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
+    assertTrue("ExectorService isn't terminated after writer#stop!", exec.isShutdown());
+  }
+
+  /**
+   * Index updates can potentially be queued up if there aren't enough writer threads. If a running
+   * index write fails, then we should early exit the pending indexupdate, when it comes up (if the
+   * pool isn't already shutdown).
+   * <p>
+   * This test is a little bit racey - we could actually have the failure of the first task before
+   * the third task is even submitted. However, we should never see the third task attempt to make
+   * the batch write, so we should never see a failure here.
+   * @throws Exception on failure
+   */
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testFailureOnRunningUpdateAbortsPending() throws Exception {
+    Abortable abort = new StubAbortable();
+    Stoppable stop = Mockito.mock(Stoppable.class);
+    // single thread factory so the older request gets queued
+    ExecutorService exec = Executors.newFixedThreadPool(1);
+    Map<ImmutableBytesPtr, HTableInterface> tables = new HashMap<ImmutableBytesPtr, HTableInterface>();
+    FakeTableFactory factory = new FakeTableFactory(tables);
+
+    // updates to two different tables
+    byte[] tableName = Bytes.add(this.testName.getTableName(), new byte[] { 1, 2, 3, 4 });
+    Put m = new Put(row);
+    m.add(Bytes.toBytes("family"), Bytes.toBytes("qual"), null);
+    byte[] tableName2 = this.testName.getTableName();// this will sort after the first tablename
+    List<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>();
+    indexUpdates.add(new Pair<Mutation, byte[]>(m, tableName));
+    indexUpdates.add(new Pair<Mutation, byte[]>(m, tableName2));
+    indexUpdates.add(new Pair<Mutation, byte[]>(m, tableName2));
+
+    // first table will fail
+    HTableInterface table = Mockito.mock(HTableInterface.class);
+    Mockito.when(table.batch(Mockito.anyList())).thenThrow(
+      new IOException("Intentional IOException for failed first write."));
+    Mockito.when(table.getTableName()).thenReturn(tableName);
+
+    // second table just blocks to make sure that the abort propagates to the third task
+    final CountDownLatch waitOnAbortedLatch = new CountDownLatch(1);
+    final boolean[] failed = new boolean[] { false };
+    HTableInterface table2 = Mockito.mock(HTableInterface.class);
+    Mockito.when(table2.getTableName()).thenReturn(tableName2);
+    Mockito.when(table2.batch(Mockito.anyList())).thenAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        waitOnAbortedLatch.await();
+        return null;
+      }
+    }).thenAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        failed[0] = true;
+        throw new RuntimeException(
+            "Unexpected exception - second index table shouldn't have been written to");
+      }
+    });
+
+    // add the tables to the set of tables, so its returned to the writer
+    tables.put(new ImmutableBytesPtr(tableName), table);
+    tables.put(new ImmutableBytesPtr(tableName2), table2);
+
+    ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter();
+    committer.setup(factory, exec, abort, stop, 2);
+    KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
+    policy.setup(stop, abort);
+    IndexWriter writer = new IndexWriter(committer, policy);
+    try {
+      writer.write(indexUpdates);
+      fail("Should not have successfully completed all index writes");
+    } catch (SingleIndexWriteFailureException e) {
+      LOG.info("Correctly got a failure to reach the index", e);
+      // should have correctly gotten the correct abort, so let the next task execute
+      waitOnAbortedLatch.countDown();
+    }
+    assertFalse(
+      "Third set of index writes never have been attempted - should have seen the abort before done!",
+      failed[0]);
+    writer.stop(this.testName.getTableNameString() + " finished");
+    assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
+    assertTrue("ExectorService isn't terminated after writer#stop!", exec.isShutdown());
+  }
+
+  /**
+   * Test that if we get an interruption to to the thread while doing a batch (e.g. via shutdown),
+   * that we correctly end the task
+   * @throws Exception on failure
+   */
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testShutdownInterruptsAsExpected() throws Exception {
+    Stoppable stop = Mockito.mock(Stoppable.class);
+    Abortable abort = new StubAbortable();
+    // single thread factory so the older request gets queued
+    ExecutorService exec = Executors.newFixedThreadPool(1);
+    Map<ImmutableBytesPtr, HTableInterface> tables = new HashMap<ImmutableBytesPtr, HTableInterface>();
+    FakeTableFactory factory = new FakeTableFactory(tables);
+
+    byte[] tableName = this.testName.getTableName();
+    HTableInterface table = Mockito.mock(HTableInterface.class);
+    Mockito.when(table.getTableName()).thenReturn(tableName);
+    final CountDownLatch writeStartedLatch = new CountDownLatch(1);
+    // latch never gets counted down, so we wait forever
+    final CountDownLatch waitOnAbortedLatch = new CountDownLatch(1);
+    Mockito.when(table.batch(Mockito.anyList())).thenAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        LOG.info("Write started");
+        writeStartedLatch.countDown();
+        // when we interrupt the thread for shutdown, we should see this throw an interrupt too
+        try {
+        waitOnAbortedLatch.await();
+        } catch (InterruptedException e) {
+          LOG.info("Correctly interrupted while writing!");
+          throw e;
+        }
+        return null;
+      }
+    });
+    // add the tables to the set of tables, so its returned to the writer
+    tables.put(new ImmutableBytesPtr(tableName), table);
+
+    // update a single table
+    Put m = new Put(row);
+    m.add(Bytes.toBytes("family"), Bytes.toBytes("qual"), null);
+    final List<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>();
+    indexUpdates.add(new Pair<Mutation, byte[]>(m, tableName));
+
+    // setup the writer
+    ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter();
+    committer.setup(factory, exec, abort, stop, 2);
+    KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
+    policy.setup(stop, abort);
+    final IndexWriter writer = new IndexWriter(committer, policy);
+
+    final boolean[] failedWrite = new boolean[] { false };
+    Thread primaryWriter = new Thread() {
+
+      @Override
+      public void run() {
+        try {
+          writer.write(indexUpdates);
+        } catch (IndexWriteException e) {
+          failedWrite[0] = true;
+        }
+      }
+    };
+    primaryWriter.start();
+    // wait for the write to start before intentionally shutdown the pool
+    writeStartedLatch.await();
+    writer.stop("Shutting down writer for test " + this.testName.getTableNameString());
+    primaryWriter.join();
+    assertTrue("Writer should have failed because of the stop we issued", failedWrite[0]);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
new file mode 100644
index 0000000..b6331cd
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.phoenix.hbase.index.StubAbortable;
+import org.apache.phoenix.hbase.index.TableName;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
+
+public class TestParalleIndexWriter {
+
+  private static final Log LOG = LogFactory.getLog(TestParalleIndexWriter.class);
+  @Rule
+  public TableName test = new TableName();
+  private final byte[] row = Bytes.toBytes("row");
+
+  @Test
+  public void testCorrectlyCleansUpResources() throws Exception{
+    ExecutorService exec = Executors.newFixedThreadPool(1);
+    FakeTableFactory factory = new FakeTableFactory(
+        Collections.<ImmutableBytesPtr, HTableInterface> emptyMap());
+    ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter();
+    Abortable mockAbort = Mockito.mock(Abortable.class);
+    Stoppable mockStop = Mockito.mock(Stoppable.class);
+    // create a simple writer
+    writer.setup(factory, exec, mockAbort, mockStop, 1);
+    // stop the writer
+    writer.stop(this.test.getTableNameString() + " finished");
+    assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
+    assertTrue("ExectorService isn't terminated after writer#stop!", exec.isShutdown());
+    Mockito.verifyZeroInteractions(mockAbort, mockStop);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testSynchronouslyCompletesAllWrites() throws Exception {
+    LOG.info("Starting " + test.getTableNameString());
+    LOG.info("Current thread is interrupted: " + Thread.interrupted());
+    Abortable abort = new StubAbortable();
+    Stoppable stop = Mockito.mock(Stoppable.class);
+    ExecutorService exec = Executors.newFixedThreadPool(1);
+    Map<ImmutableBytesPtr, HTableInterface> tables =
+        new HashMap<ImmutableBytesPtr, HTableInterface>();
+    FakeTableFactory factory = new FakeTableFactory(tables);
+
+    ImmutableBytesPtr tableName = new ImmutableBytesPtr(this.test.getTableName());
+    Put m = new Put(row);
+    m.add(Bytes.toBytes("family"), Bytes.toBytes("qual"), null);
+    Multimap<HTableInterfaceReference, Mutation> indexUpdates =
+        ArrayListMultimap.<HTableInterfaceReference, Mutation> create();
+    indexUpdates.put(new HTableInterfaceReference(tableName), m);
+
+    HTableInterface table = Mockito.mock(HTableInterface.class);
+    final boolean[] completed = new boolean[] { false };
+    Mockito.when(table.batch(Mockito.anyList())).thenAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        // just keep track that it was called
+        completed[0] = true;
+        return null;
+      }
+    });
+    Mockito.when(table.getTableName()).thenReturn(test.getTableName());
+    // add the table to the set of tables, so its returned to the writer
+    tables.put(tableName, table);
+
+    // setup the writer and failure policy
+    ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter();
+    writer.setup(factory, exec, abort, stop, 1);
+    writer.write(indexUpdates);
+    assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",
+      completed[0]);
+    writer.stop(this.test.getTableNameString() + " finished");
+    assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
+    assertTrue("ExectorService isn't terminated after writer#stop!", exec.isShutdown());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
new file mode 100644
index 0000000..7a09d4a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.phoenix.hbase.index.StubAbortable;
+import org.apache.phoenix.hbase.index.TableName;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
+
+public class TestParalleWriterIndexCommitter {
+
+  private static final Log LOG = LogFactory.getLog(TestParalleWriterIndexCommitter.class);
+  @Rule
+  public TableName test = new TableName();
+  private final byte[] row = Bytes.toBytes("row");
+
+  @Test
+  public void testCorrectlyCleansUpResources() throws Exception{
+    ExecutorService exec = Executors.newFixedThreadPool(1);
+    FakeTableFactory factory = new FakeTableFactory(
+        Collections.<ImmutableBytesPtr, HTableInterface> emptyMap());
+    ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter();
+    Abortable mockAbort = Mockito.mock(Abortable.class);
+    Stoppable mockStop = Mockito.mock(Stoppable.class);
+    // create a simple writer
+    writer.setup(factory, exec, mockAbort, mockStop, 1);
+    // stop the writer
+    writer.stop(this.test.getTableNameString() + " finished");
+    assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
+    assertTrue("ExectorService isn't terminated after writer#stop!", exec.isShutdown());
+    Mockito.verifyZeroInteractions(mockAbort, mockStop);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testSynchronouslyCompletesAllWrites() throws Exception {
+    LOG.info("Starting " + test.getTableNameString());
+    LOG.info("Current thread is interrupted: " + Thread.interrupted());
+    Abortable abort = new StubAbortable();
+    Stoppable stop = Mockito.mock(Stoppable.class);
+    ExecutorService exec = Executors.newFixedThreadPool(1);
+    Map<ImmutableBytesPtr, HTableInterface> tables =
+        new HashMap<ImmutableBytesPtr, HTableInterface>();
+    FakeTableFactory factory = new FakeTableFactory(tables);
+
+    ImmutableBytesPtr tableName = new ImmutableBytesPtr(this.test.getTableName());
+    Put m = new Put(row);
+    m.add(Bytes.toBytes("family"), Bytes.toBytes("qual"), null);
+    Multimap<HTableInterfaceReference, Mutation> indexUpdates =
+        ArrayListMultimap.<HTableInterfaceReference, Mutation> create();
+    indexUpdates.put(new HTableInterfaceReference(tableName), m);
+
+    HTableInterface table = Mockito.mock(HTableInterface.class);
+    final boolean[] completed = new boolean[] { false };
+    Mockito.when(table.batch(Mockito.anyList())).thenAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        // just keep track that it was called
+        completed[0] = true;
+        return null;
+      }
+    });
+    Mockito.when(table.getTableName()).thenReturn(test.getTableName());
+    // add the table to the set of tables, so its returned to the writer
+    tables.put(tableName, table);
+
+    // setup the writer and failure policy
+    ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter();
+    writer.setup(factory, exec, abort, stop, 1);
+    writer.write(indexUpdates);
+    assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",
+      completed[0]);
+    writer.stop(this.test.getTableNameString() + " finished");
+    assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
+    assertTrue("ExectorService isn't terminated after writer#stop!", exec.isShutdown());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
new file mode 100644
index 0000000..4d886e5
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Multimap;
+
+import org.apache.phoenix.hbase.index.IndexTestingUtils;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.TableName;
+import org.apache.phoenix.hbase.index.covered.example.ColumnGroup;
+import org.apache.phoenix.hbase.index.covered.example.CoveredColumn;
+import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexSpecifierBuilder;
+import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexer;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
+import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
+
+/**
+ * When a regionserver crashes, its WAL is split and then replayed to the server. If the index
+ * region was present on the same server, we have to make a best effort to not kill the server for
+ * not succeeding on index writes while the index region is coming up.
+ */
+public class TestWALRecoveryCaching {
+
+  private static final Log LOG = LogFactory.getLog(TestWALRecoveryCaching.class);
+  private static final long ONE_SEC = 1000;
+  private static final long ONE_MIN = 60 * ONE_SEC;
+  private static final long TIMEOUT = ONE_MIN;
+
+  @Rule
+  public TableName testTable = new TableName();
+
+  private String getIndexTableName() {
+    return this.testTable.getTableNameString() + "_index";
+  }
+
+  // -----------------------------------------------------------------------------------------------
+  // Warning! The classes here rely on this static. Adding multiple tests to this class and running
+  // them concurrently could have unexpected results (including, but not limited to, odd failures
+  // and flapping tests).
+  // -----------------------------------------------------------------------------------------------
+  private static CountDownLatch allowIndexTableToRecover;
+
+  public static class IndexTableBlockingReplayObserver extends BaseRegionObserver {
+
+    @Override
+    public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
+        HLogKey logKey, WALEdit logEdit) throws IOException {
+      try {
+        LOG.debug("Restoring logs for index table");
+        if (allowIndexTableToRecover != null) {
+          allowIndexTableToRecover.await();
+          LOG.debug("Completed index table recovery wait latch");
+        }
+      } catch (InterruptedException e) {
+        Assert.fail("Should not be interrupted while waiting to allow the index to restore WALs.");
+      }
+    }
+  }
+
+  public static class ReleaseLatchOnFailurePolicy extends StoreFailuresInCachePolicy {
+
+    /**
+     * @param failedIndexEdits
+     */
+    public ReleaseLatchOnFailurePolicy(PerRegionIndexWriteCache failedIndexEdits) {
+      super(failedIndexEdits);
+    }
+
+    @Override
+    public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted,
+        Exception cause) throws IOException {
+      LOG.debug("Found index update failure!");
+      if (allowIndexTableToRecover != null) {
+        LOG.info("failed index write on WAL recovery - allowing index table to be restored.");
+        allowIndexTableToRecover.countDown();
+      }
+      super.handleFailure(attempted, cause);
+    }
+
+  }
+
+  //TODO: Jesse to fix
+  @Ignore("Configuration issue - valid test, just needs fixing")
+  @Test
+  public void testWaitsOnIndexRegionToReload() throws Exception {
+    HBaseTestingUtility util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+
+    // setup other useful stats
+    IndexTestingUtils.setupConfig(conf);
+    conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
+
+    // make sure everything is setup correctly
+    IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
+
+    // start the cluster with 2 rs
+    util.startMiniCluster(2);
+
+    HBaseAdmin admin = util.getHBaseAdmin();
+    // setup the index
+    byte[] family = Bytes.toBytes("family");
+    byte[] qual = Bytes.toBytes("qualifier");
+    byte[] nonIndexedFamily = Bytes.toBytes("nonIndexedFamily");
+    String indexedTableName = getIndexTableName();
+    ColumnGroup columns = new ColumnGroup(indexedTableName);
+    columns.add(new CoveredColumn(family, qual));
+    CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
+    builder.addIndexGroup(columns);
+
+    // create the primary table w/ indexing enabled
+    HTableDescriptor primaryTable = new HTableDescriptor(testTable.getTableName());
+    primaryTable.addFamily(new HColumnDescriptor(family));
+    primaryTable.addFamily(new HColumnDescriptor(nonIndexedFamily));
+    builder.addArbitraryConfigForTesting(Indexer.RecoveryFailurePolicyKeyForTesting,
+      ReleaseLatchOnFailurePolicy.class.getName());
+    builder.build(primaryTable);
+    admin.createTable(primaryTable);
+
+    // create the index table
+    HTableDescriptor indexTableDesc = new HTableDescriptor(Bytes.toBytes(getIndexTableName()));
+    indexTableDesc.addCoprocessor(IndexTableBlockingReplayObserver.class.getName());
+    CoveredColumnIndexer.createIndexTable(admin, indexTableDesc);
+
+    // figure out where our tables live
+    ServerName shared =
+        ensureTablesLiveOnSameServer(util.getMiniHBaseCluster(), Bytes.toBytes(indexedTableName),
+          testTable.getTableName());
+
+    // load some data into the table
+    Put p = new Put(Bytes.toBytes("row"));
+    p.add(family, qual, Bytes.toBytes("value"));
+    HTable primary = new HTable(conf, testTable.getTableName());
+    primary.put(p);
+    primary.flushCommits();
+
+    // turn on the recovery latch
+    allowIndexTableToRecover = new CountDownLatch(1);
+
+    // kill the server where the tables live - this should trigger distributed log splitting
+    // find the regionserver that matches the passed server
+    List<HRegion> online = new ArrayList<HRegion>();
+    online.addAll(getRegionsFromServerForTable(util.getMiniHBaseCluster(), shared,
+      testTable.getTableName()));
+    online.addAll(getRegionsFromServerForTable(util.getMiniHBaseCluster(), shared,
+      Bytes.toBytes(indexedTableName)));
+
+    // log all the current state of the server
+    LOG.info("Current Server/Region paring: ");
+    for (RegionServerThread t : util.getMiniHBaseCluster().getRegionServerThreads()) {
+      // check all the conditions for the server to be done
+      HRegionServer server = t.getRegionServer();
+      if (server.isStopping() || server.isStopped() || server.isAborted()) {
+        LOG.info("\t== Offline: " + server.getServerName());
+        continue;
+      }
+      List<HRegionInfo> regions = server.getOnlineRegions();
+      LOG.info("\t" + server.getServerName() + " regions: " + regions);
+    }
+
+    LOG.debug("Killing server " + shared);
+    util.getMiniHBaseCluster().killRegionServer(shared);
+    LOG.debug("Waiting on server " + shared + "to die");
+    util.getMiniHBaseCluster().waitForRegionServerToStop(shared, TIMEOUT);
+    // force reassign the regions from the table
+    // LOG.debug("Forcing region reassignment from the killed server: " + shared);
+    // for (HRegion region : online) {
+    // util.getMiniHBaseCluster().getMaster().assign(region.getRegionName());
+    // }
+    System.out.println(" ====== Killed shared server ==== ");
+
+    // make a second put that (1), isn't indexed, so we can be sure of the index state and (2)
+    // ensures that our table is back up
+    Put p2 = new Put(p.getRow());
+    p2.add(nonIndexedFamily, Bytes.toBytes("Not indexed"), Bytes.toBytes("non-indexed value"));
+    primary.put(p2);
+    primary.flushCommits();
+
+    // make sure that we actually failed the write once (within a 5 minute window)
+    assertTrue("Didn't find an error writing to index table within timeout!",
+      allowIndexTableToRecover.await(ONE_MIN * 5, TimeUnit.MILLISECONDS));
+
+    // scan the index to make sure it has the one entry, (that had to be replayed from the WAL,
+    // since we hard killed the server)
+    Scan s = new Scan();
+    HTable index = new HTable(conf, getIndexTableName());
+    ResultScanner scanner = index.getScanner(s);
+    int count = 0;
+    for (Result r : scanner) {
+      LOG.info("Got index table result:" + r);
+      count++;
+    }
+    assertEquals("Got an unexpected found of index rows", 1, count);
+
+    // cleanup
+    scanner.close();
+    index.close();
+    primary.close();
+    util.shutdownMiniCluster();
+  }
+
+  /**
+   * @param miniHBaseCluster
+   * @param server
+   * @param bs
+   * @return
+   */
+  private List<HRegion> getRegionsFromServerForTable(MiniHBaseCluster cluster, ServerName server,
+      byte[] table) {
+    List<HRegion> online = Collections.emptyList();
+    for (RegionServerThread rst : cluster.getRegionServerThreads()) {
+      // if its the server we are going to kill, get the regions we want to reassign
+      if (rst.getRegionServer().getServerName().equals(server)) {
+        online = rst.getRegionServer().getOnlineRegions(table);
+        break;
+      }
+    }
+    return online;
+  }
+
+  /**
+   * @param miniHBaseCluster
+   * @param indexedTableName
+   * @param tableNameString
+   */
+  private ServerName ensureTablesLiveOnSameServer(MiniHBaseCluster cluster, byte[] indexTable,
+      byte[] primaryTable) throws Exception {
+
+    ServerName shared = getSharedServer(cluster, indexTable, primaryTable);
+    boolean tryIndex = true;
+    while (shared == null) {
+
+      // start killing servers until we get an overlap
+      Set<ServerName> servers;
+      byte[] table = null;
+      // switch which server we kill each time to get region movement
+      if (tryIndex) {
+        table = indexTable;
+      } else {
+        table = primaryTable;
+      }
+      servers = getServersForTable(cluster, table);
+      tryIndex = !tryIndex;
+      for (ServerName server : servers) {
+        // find the regionserver that matches the passed server
+        List<HRegion> online = getRegionsFromServerForTable(cluster, server, table);
+
+        LOG.info("Shutting down and reassigning regions from " + server);
+        cluster.stopRegionServer(server);
+        cluster.waitForRegionServerToStop(server, TIMEOUT);
+
+        // force reassign the regions from the table
+        for (HRegion region : online) {
+          cluster.getMaster().assign(region.getRegionName());
+        }
+
+        LOG.info("Starting region server:" + server.getHostname());
+        cluster.startRegionServer(server.getHostname());
+
+        cluster.waitForRegionServerToStart(server.getHostname(), TIMEOUT);
+
+        // start a server to get back to the base number of servers
+        LOG.info("STarting server to replace " + server);
+        cluster.startRegionServer();
+        break;
+      }
+
+      shared = getSharedServer(cluster, indexTable, primaryTable);
+    }
+    return shared;
+  }
+
+  /**
+   * @param cluster
+   * @param indexTable
+   * @param primaryTable
+   * @return
+   * @throws Exception
+   */
+  private ServerName getSharedServer(MiniHBaseCluster cluster, byte[] indexTable,
+      byte[] primaryTable) throws Exception {
+    Set<ServerName> indexServers = getServersForTable(cluster, indexTable);
+    Set<ServerName> primaryServers = getServersForTable(cluster, primaryTable);
+
+    Set<ServerName> joinSet = new HashSet<ServerName>(indexServers);
+    joinSet.addAll(primaryServers);
+    // if there is already an overlap, then find it and return it
+    if (joinSet.size() < indexServers.size() + primaryServers.size()) {
+      // find the first overlapping server
+      for (ServerName server : joinSet) {
+        if (indexServers.contains(server) && primaryServers.contains(server)) {
+          return server;
+        }
+      }
+      throw new RuntimeException(
+          "Couldn't find a matching server on which both the primary and index table live, "
+              + "even though they have overlapping server sets");
+    }
+    return null;
+  }
+
+  private Set<ServerName> getServersForTable(MiniHBaseCluster cluster, byte[] table)
+      throws Exception {
+    List<HRegion> indexRegions = cluster.getRegions(table);
+    Set<ServerName> indexServers = new HashSet<ServerName>();
+    for (HRegion region : indexRegions) {
+      indexServers.add(cluster.getServerHoldingRegion(region.getRegionName()));
+    }
+    return indexServers;
+  }
+}
\ No newline at end of file


Mime
View raw message