phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [4/7] phoenix git commit: PHOENIX-4070 Delete row should mask upserts at same timestamp
Date Tue, 08 Aug 2017 05:08:49 GMT
PHOENIX-4070 Delete row should mask upserts at same timestamp


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/78f35a65
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/78f35a65
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/78f35a65

Branch: refs/heads/master
Commit: 78f35a65d0cc55014c51da70c847969a2ea3c5a6
Parents: 27170bd
Author: James Taylor <jamestaylor@apache.org>
Authored: Mon Aug 7 11:41:15 2017 -0700
Committer: James Taylor <jamestaylor@apache.org>
Committed: Mon Aug 7 22:08:36 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/OutOfOrderMutationsIT.java  | 159 ++++++--
 .../end2end/index/MutableIndexFailureIT.java    |  27 +-
 .../end2end/index/PartialIndexRebuilderIT.java  | 366 +++++++++++++++++++
 .../coprocessor/MetaDataRegionObserver.java     |  58 +--
 .../apache/phoenix/hbase/index/ValueGetter.java |  10 +-
 .../hbase/index/covered/LocalTableState.java    |  10 +-
 .../index/covered/data/LazyValueGetter.java     |  19 +-
 .../example/CoveredColumnIndexCodec.java        |   5 +-
 .../filter/ApplyAndFilterDeletesFilter.java     |   7 +-
 .../hbase/index/scanner/EmptyScanner.java       |  16 +-
 .../hbase/index/scanner/ScannerBuilder.java     |  23 +-
 .../hbase/index/util/IndexManagementUtil.java   |   4 +-
 .../apache/phoenix/index/IndexMaintainer.java   |  32 +-
 .../index/PhoenixIndexFailurePolicy.java        |  57 +--
 .../index/PhoenixTransactionalIndexer.java      |   2 +-
 .../phoenix/schema/tuple/ValueGetterTuple.java  |  12 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |  45 ++-
 .../index/covered/TestLocalTableState.java      |   9 +-
 .../phoenix/index/IndexMaintainerTest.java      |   5 +-
 .../java/org/apache/phoenix/util/TestUtil.java  | 140 +++++++
 20 files changed, 811 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
index 3cf7336..0e038e2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
@@ -91,13 +91,9 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
         conn = DriverManager.getConnection(getUrl(), props);
         
-        long count1 = getRowCount(conn, tableName);
-        long count2 = getRowCount(conn, indexName);
-        assertEquals(0, count1);
-        assertEquals(0, count2);
-        conn.close();
-        
+        TestUtil.scutinizeIndex(conn, tableName, indexName);        
         assertNoTimeStampAt(conn, indexName, 1030);
+        conn.close();
         
         /**
          *
@@ -179,11 +175,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
         conn = DriverManager.getConnection(getUrl(), props);
         
-        long count1 = getRowCount(conn, tableName);
-        long count2 = getRowCount(conn, indexName);
-        assertEquals("Table should have 1 row", 1, count1);
-        assertEquals("Index should have 1 row", 1, count2);
-        conn.close();
+        TestUtil.scutinizeIndex(conn, tableName, indexName);        
         
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts FROM " + tableName);
         assertTrue(rs.next());
@@ -196,6 +188,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         assertFalse(rs.next());
 
         assertNoTimeStampAt(conn, indexName, 1030);
+        conn.close();
 
         /**
          *
@@ -272,11 +265,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
         TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
 
-        long count1 = getRowCount(conn, tableName);
-        long count2 = getRowCount(conn, indexName);
-        assertEquals("Table should have 1 row", 1, count1);
-        assertEquals("Index should have 1 row", 1, count2);
-        conn.close();
+        TestUtil.scutinizeIndex(conn, tableName, indexName);        
         
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts FROM " + tableName);
         assertTrue(rs.next());
@@ -287,6 +276,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         assertTrue(rs.next());
         assertEquals(expectedTimestamp, rs.getTimestamp(1));
         assertFalse(rs.next());
+        conn.close();
     }
 
     @Test
@@ -341,11 +331,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
         TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
 
-        long count1 = getRowCount(conn, tableName);
-        long count2 = getRowCount(conn, indexName);
-        assertEquals("Table should have 1 row", 1, count1);
-        assertEquals("Index should have 1 row", 1, count2);
-        conn.close();
+        TestUtil.scutinizeIndex(conn, tableName, indexName);        
         
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts FROM " + tableName);
         assertTrue(rs.next());
@@ -356,6 +342,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         assertTrue(rs.next());
         assertEquals(expectedTimestamp, rs.getTimestamp(1));
         assertFalse(rs.next());
+        conn.close();
     }    
     
     @Test
@@ -406,11 +393,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
         TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
 
-        long count1 = getRowCount(conn, tableName);
-        long count2 = getRowCount(conn, indexName);
-        assertEquals("Table should have 1 row", 1, count1);
-        assertEquals("Index should have 1 row", 1, count2);
-        conn.close();
+        TestUtil.scutinizeIndex(conn, tableName, indexName);        
         
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts,v FROM " + tableName);
         assertTrue(rs.next());
@@ -423,6 +406,8 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         assertEquals(expectedTimestamp, rs.getTimestamp(1));
         assertEquals(null, rs.getString(2));
         assertFalse(rs.next());
+        
+        conn.close();
     }
 
     @Test
@@ -473,11 +458,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
         TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
 
-        long count1 = getRowCount(conn, tableName);
-        long count2 = getRowCount(conn, indexName);
-        assertEquals("Table should have 1 row", 1, count1);
-        assertEquals("Index should have 1 row", 1, count2);
-        conn.close();
+        TestUtil.scutinizeIndex(conn, tableName, indexName);        
         
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts,v FROM " + tableName);
         assertTrue(rs.next());
@@ -490,6 +471,8 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         assertEquals(expectedTimestamp, rs.getTimestamp(1));
         assertEquals(null, rs.getString(2));
         assertFalse(rs.next());
+        
+        conn.close();
     }
     
     @Test
@@ -540,11 +523,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
         TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
 
-        long count1 = getRowCount(conn, tableName);
-        long count2 = getRowCount(conn, indexName);
-        assertEquals("Table should have 1 row", 1, count1);
-        assertEquals("Index should have 1 row", 1, count2);
-        conn.close();
+        TestUtil.scutinizeIndex(conn, tableName, indexName);        
         
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts,v FROM " + tableName);
         assertTrue(rs.next());
@@ -557,5 +536,113 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
         assertEquals(expectedTimestamp, rs.getTimestamp(1));
         assertEquals(null, rs.getString(2));
         assertFalse(rs.next());
+        
+        conn.close();
+    }
+    
+    @Test
+    public void testDeleteRowAndUpsertValueAtSameTS1() throws Exception {
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        long ts = 1000;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);     
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, A.V VARCHAR, B.V2 VARCHAR, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+        conn.close();
+
+        ts = 1010;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(k2,k1,ts) INCLUDE (V, v2)");
+        conn.close();
+        
+        ts = 1020;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, '0','1')");
+        stmt.setTimestamp(1, new Timestamp(1000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        Timestamp expectedTimestamp;
+        ts = 1040;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE (K1,K2) = ('aa','aa')");
+        stmt.executeUpdate();
+        conn.commit();
+        expectedTimestamp = new Timestamp(3000L);
+        stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, null,'3')");
+        stmt.setTimestamp(1, expectedTimestamp);
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1050;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        
+        TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
+        TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
+
+        long rowCount = TestUtil.scutinizeIndex(conn, tableName, indexName);
+        assertEquals(0,rowCount);
+        
+        conn.close();
+    }
+    
+    @Test
+    public void testDeleteRowAndUpsertValueAtSameTS2() throws Exception {
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        long ts = 1000;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);     
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, V VARCHAR, V2 VARCHAR, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+        conn.close();
+
+        ts = 1010;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(k2,k1,ts) INCLUDE (V, v2)");
+        conn.close();
+        
+        ts = 1020;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, '0')");
+        stmt.setTimestamp(1, new Timestamp(1000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        Timestamp expectedTimestamp;
+        ts = 1040;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        expectedTimestamp = new Timestamp(3000L);
+        stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, null)");
+        stmt.setTimestamp(1, expectedTimestamp);
+        stmt.executeUpdate();
+        conn.commit();
+        stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE (K1,K2) = ('aa','aa')");
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1050;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        
+        TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
+        TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
+
+        long rowCount = TestUtil.scutinizeIndex(conn, tableName, indexName);
+        assertEquals(0,rowCount);
+        
+        conn.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 86a778d..1d47eaa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -48,7 +48,6 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.execute.CommitException;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -63,6 +62,7 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -324,31 +324,12 @@ public class MutableIndexFailureIT extends BaseTest {
     }
 
     private void waitForIndexRebuild(Connection conn, String index, PIndexState expectedIndexState) throws InterruptedException, SQLException {
-        boolean isActive = false;
         if (!transactional) {
-            int maxTries = 12, nTries = 0;
-            do {
-                Thread.sleep(5 * 1000); // sleep 5 secs
-                String query = "SELECT CAST(" + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " AS BIGINT) FROM " +
-                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME
-                        + ") = (" + "'" + schema + "','" + index + "') "
-                        + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL"
-                        + " AND " + PhoenixDatabaseMetaData.INDEX_STATE + " = '" + expectedIndexState.getSerializedValue() + "'";
-                ResultSet rs = conn.createStatement().executeQuery(query);
-                assertTrue(rs.next());
-                if (expectedIndexState == PIndexState.ACTIVE) {
-                    if (rs.getLong(1) == 0 && !rs.wasNull()) {
-                        isActive = true;
-                        break;
-                    }
-                }
-            } while (++nTries < maxTries);
-            if (expectedIndexState == PIndexState.ACTIVE) {
-                assertTrue(isActive);
-            }
+            String fullIndexName = SchemaUtil.getTableName(schema, index);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, expectedIndexState);
         }
     }
-
+    
     private void initializeTable(Connection conn, String tableName) throws SQLException {
         PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
         stmt.setString(1, "a");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
new file mode 100644
index 0000000..ef9ae1b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString());
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
+        serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "30000"); // give up rebuilding after 30 seconds
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
+    }
+
+
+    @Test
+    public void testRowCountIndexScrutiny() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
+            conn.commit();
+            
+            int count = conn.createStatement().executeUpdate("DELETE FROM " + fullIndexName + " WHERE \":K\"='a' AND \"0:V\"='ccc'");
+            assertEquals(1,count);
+            conn.commit();
+            try {
+                TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName);
+                fail();
+            } catch (AssertionError e) {
+                assertEquals(e.getMessage(),"Expected data table row count to match expected:<1> but was:<2>");
+            }
+        }
+    }
+    @Test
+    public void testExtraRowIndexScrutiny() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')");
+            conn.commit();
+            
+            conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('bbb','x','0')");
+            conn.commit();
+            try {
+                TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName);
+                fail();
+            } catch (AssertionError e) {
+                assertEquals(e.getMessage(),"Expected to find PK in data table: ('x')");
+            }
+        }
+    }
+    
+    @Test
+    public void testValuetIndexScrutiny() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')");
+            conn.commit();
+            
+            conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('ccc','a','2')");
+            conn.commit();
+            try {
+                TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName);
+                fail();
+            } catch (AssertionError e) {
+                assertEquals(e.getMessage(),"Expected equality for V2, but '2'!='1'");
+            }
+        }
+    }
+
+    @Test
+    public void testMultiVersionsAfterFailure() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','bb')");
+            conn.commit();
+            long disableTS = EnvironmentEdgeManager.currentTimeMillis();
+            HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
+            conn.commit();
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd')");
+            conn.commit();
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee')");
+            conn.commit();
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
+
+            TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName);
+        }
+    }
+    
+    @Test
+    public void testUpsertNullAfterFailure() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')");
+            conn.commit();
+            long disableTS = EnvironmentEdgeManager.currentTimeMillis();
+            HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)");
+            conn.commit();
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','bb')");
+            conn.commit();
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
+            conn.commit();
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
+
+            TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName);
+        }
+    }
+    
+    @Test
+    public void testUpsertNullTwiceAfterFailure() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)");
+            conn.commit();
+            long disableTS = EnvironmentEdgeManager.currentTimeMillis();
+            HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','bb')");
+            conn.commit();
+            conn.createStatement().execute("DELETE FROM " + fullTableName);
+            conn.commit();
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)");
+            conn.commit();
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
+
+            TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName);
+        }
+    }
+    
+    @Test
+    public void testDeleteAfterFailure() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)");
+            conn.commit();
+            long disableTS = EnvironmentEdgeManager.currentTimeMillis();
+            HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','b')");
+            conn.commit();
+            conn.createStatement().execute("DELETE FROM " + fullTableName);
+            conn.commit();
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
+
+            TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName);
+       }
+    }
+    
+    @Test
+    public void testDeleteBeforeFailure() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)");
+            conn.commit();
+            conn.createStatement().execute("DELETE FROM " + fullTableName);
+            conn.commit();
+            long disableTS = EnvironmentEdgeManager.currentTimeMillis();
+            HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','b')");
+            conn.commit();
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
+
+            TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName);
+        }
+    }
+    
+    @Test
+    public void testMultiValuesAtSameTS() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')");
+            conn.commit();
+            long disableTS = EnvironmentEdgeManager.currentTimeMillis();
+            HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
+            Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+            props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(disableTS));
+            try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) {
+                conn2.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','bb')");
+                conn2.commit();
+                conn2.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
+                conn2.commit();
+            }
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
+
+            TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName);
+        }
+    }
+    
+    @Test
+    public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')");
+            conn.commit();
+            long disableTS = EnvironmentEdgeManager.currentTimeMillis();
+            HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
+            Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+            props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(disableTS));
+            try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) {
+                conn2.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k='a'");
+                conn2.commit();
+                conn2.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
+                conn2.commit();
+            }
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
+
+            TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName);
+        }
+    }
+    
+    @Test
+    public void testDeleteAndUpsertValuesAtSameTS2() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')");
+            conn.commit();
+            long disableTS = EnvironmentEdgeManager.currentTimeMillis();
+            HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
+            Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+            props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(disableTS));
+            try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) {
+                conn2.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
+                conn2.commit();
+                conn2.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k='a'");
+                conn2.commit();
+            }
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
+
+            TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 5cfacfc..9b68cd4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -59,10 +58,6 @@ import org.apache.log4j.Logger;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.compile.PostDDLCompiler;
-import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
-import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -77,11 +72,10 @@ import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -90,10 +84,8 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.UpgradeUtil;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.protobuf.ServiceException;
 
 
 /**
@@ -326,8 +318,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                          * rebuild task won't pick up this index again for rebuild.
                          */
                         try {
-                            updateIndexState(conn, indexTableFullName, env, state,
-                                PIndexState.DISABLE, 0l);
+                            IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.DISABLE, 0l);
                             LOG.error("Unable to rebuild index " + indexTableFullName
                                     + ". Won't attempt again since index disable timestamp is older than current time by "
                                     + indexDisableTimestampThreshold
@@ -342,7 +333,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     // Allow index to begin incremental maintenance as index is back online and we
                     // cannot transition directly from DISABLED -> ACTIVE
                     if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexState) == 0) {
-                        updateIndexState(conn, indexTableFullName, env, PIndexState.DISABLE, PIndexState.INACTIVE, null);
+                        IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.INACTIVE, null);
                     }
                     List<PTable> indexesToPartiallyRebuild = dataTableToIndexesMap.get(dataPTable);
                     if (indexesToPartiallyRebuild == null) {
@@ -409,7 +400,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
 
 							long scanEndTime = getTimestampForBatch(timeStamp,
 									batchExecutedPerTableMap.get(dataPTable.getName()));
-							
+							// We can't allow partial results
+							dataTableScan.setAllowPartialResults(false);
 							dataTableScan.setTimeRange(timeStamp, scanEndTime);
 							dataTableScan.setCacheBlocks(false);
 							dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
@@ -436,8 +428,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
 										indexPTable.getSchemaName().getString(),
 										indexPTable.getTableName().getString());
 								if (scanEndTime == HConstants.LATEST_TIMESTAMP) {
-									updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE,
-											PIndexState.ACTIVE, 0l);
+									IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.ACTIVE, 0l);
 									batchExecutedPerTableMap.remove(dataPTable.getName());
                                     LOG.info("Making Index:" + indexPTable.getTableName() + " active after rebuilding");
 								} else {
@@ -499,43 +490,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
         }
     }
     
-	private static void updateIndexState(PhoenixConnection conn, String indexTableName,
-			RegionCoprocessorEnvironment env, PIndexState oldState, PIndexState newState, Long indexDisableTimestamp)
-					throws ServiceException, Throwable {
-        if (newState == PIndexState.ACTIVE) {
-            Preconditions.checkArgument(indexDisableTimestamp == 0,
-                "Index disable timestamp has to be 0 when marking an index as active");
-        }
-		byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
-		String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName);
-		String indexName = SchemaUtil.getTableNameFromFullName(indexTableName);
-		// Mimic the Put that gets generated by the client on an update of the
-		// index state
-		Put put = new Put(indexTableKey);
-		put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-				newState.getSerializedBytes());
-        if (indexDisableTimestamp != null) {
-            put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
-                PLong.INSTANCE.toBytes(indexDisableTimestamp));
-        }
-        if (newState == PIndexState.ACTIVE) {
-            put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0));
-        }
-		final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put);
-		MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null);
-		MutationCode code = result.getMutationCode();
-		if (code == MutationCode.TABLE_NOT_FOUND) {
-			throw new TableNotFoundException(schemaName, indexName);
-		}
-		if (code == MutationCode.UNALLOWED_TABLE_MUTATION) {
-			throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION)
-					.setMessage(" currentState=" + oldState + ". requestedState=" + newState).setSchemaName(schemaName)
-					.setTableName(indexName).build().buildException();
-		}
-	}
-
 	private static void updateDisableTimestamp(PhoenixConnection conn, String indexTableName,
 			RegionCoprocessorEnvironment env, long disabledTimestamp, HTableInterface metaTable) throws IOException {
 		byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
index af847b7..8c75424 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
@@ -23,16 +23,18 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 
 public interface ValueGetter {
-
+  public static final ImmutableBytesWritable HIDDEN_BY_DELETE = new ImmutableBytesWritable(new byte[0]);
   /**
    * Get the most recent (largest timestamp) for the given column reference
    * @param ref to match against an underlying key value. Uses the passed object to match the
    *          keyValue via {@link ColumnReference#matches}
-   * @return the stored value for the given {@link ColumnReference}, or <tt>null</tt> if no value is
-   *         present.
+ * @param ts time stamp at which mutations will be issued
+   * @return the stored value for the given {@link ColumnReference}, <tt>null</tt> if no value is
+   *         present, or {@link ValueGetter#HIDDEN_BY_DELETE} if no value is present and the ref
+   *         will be shadowed by a delete marker.
    * @throws IOException if there is an error accessing the underlying data storage
    */
-  public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException;
+  public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException;
   
   public byte[] getRowKey();
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index acbf1ab..0f5a9f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -30,8 +30,8 @@ import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
+import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 
 /**
@@ -163,7 +163,7 @@ public class LocalTableState implements TableState {
      *         {@link IndexUpdate}.
      * @throws IOException
      */
-    public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
+    public Pair<CoveredDeleteScanner, IndexUpdate> getIndexedColumnsTableState(
         Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData) throws IOException {
         ensureLocalStateInitialized(indexedColumns, ignoreNewerMutations, indexMetaData);
         // filter out things with a newer timestamp and track the column references to which it applies
@@ -175,9 +175,9 @@ public class LocalTableState implements TableState {
             }
         }
 
-        Scanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts, returnNullScannerIfRowNotFound);
+        CoveredDeleteScanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts, returnNullScannerIfRowNotFound);
 
-        return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
+        return new Pair<CoveredDeleteScanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
     }
 
     /**
@@ -266,7 +266,7 @@ public class LocalTableState implements TableState {
     @Override
     public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData)
             throws IOException {
-        Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations, returnNullScannerIfRowNotFound, indexMetaData);
+        Pair<CoveredDeleteScanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations, returnNullScannerIfRowNotFound, indexMetaData);
         ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
         return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
index 52076a2..bafefce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
@@ -24,11 +24,12 @@ import java.util.Map;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter.DeleteTracker;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
+import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 /**
@@ -37,7 +38,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
  */
 public class LazyValueGetter implements ValueGetter {
 
-  private Scanner scan;
+  private CoveredDeleteScanner scan;
   private volatile Map<ColumnReference, ImmutableBytesWritable> values;
   private byte[] row;
   
@@ -46,13 +47,13 @@ public class LazyValueGetter implements ValueGetter {
    * @param scan backing scanner
    * @param currentRow row key for the row to seek in the scanner
    */
-  public LazyValueGetter(Scanner scan, byte[] currentRow) {
+  public LazyValueGetter(CoveredDeleteScanner scan, byte[] currentRow) {
     this.scan = scan;
     this.row = currentRow;
   }
 
   @Override
-  public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException {
+  public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
     // ensure we have a backing map
     if (values == null) {
       synchronized (this) {
@@ -64,6 +65,14 @@ public class LazyValueGetter implements ValueGetter {
     ImmutableBytesWritable value = values.get(ref);
     if (value == null) {
       value = get(ref);
+      DeleteTracker deleteTracker = scan.getDeleteTracker();
+      if (value == null) {
+          // Delete family is used for row deletion. Family won't necessarily match as we'll be at
+          // the delete family marker on the last column family if there is one.
+          if (deleteTracker.deleteFamily != null && deleteTracker.deleteFamily.getTimestamp() == ts) {
+              value = HIDDEN_BY_DELETE;
+          }
+      }
       values.put(ref, value);
     }
 
@@ -81,7 +90,7 @@ public class LazyValueGetter implements ValueGetter {
     }
     // there is a next value - we only care about the current value, so we can just snag that
     Cell next = scan.next();
-    if (ref.matches(KeyValueUtil.ensureKeyValue(next))) {
+    if (ref.matches(next)) {
       return new ImmutableBytesPtr(next.getValueArray(), next.getValueOffset(), next.getValueLength());
     }
     return null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 1392906..c24d730 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
 import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
+import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
 
 import com.google.common.collect.Lists;
 
@@ -77,7 +78,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
     private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state, IndexMetaData indexMetaData) {
         List<CoveredColumn> refs = group.getColumns();
         try {
-            Pair<Scanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData);
+            Pair<CoveredDeleteScanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData);
             Scanner kvs = stateInfo.getFirst();
             Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs, state.getCurrentRowKey());
             // make sure we close the scanner
@@ -132,7 +133,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
     private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state, IndexMetaData indexMetaData) {
         List<CoveredColumn> refs = group.getColumns();
         try {
-            Pair<Scanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData);
+            Pair<CoveredDeleteScanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData);
             Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
             // make sure we close the scanner reference
             kvs.getFirst().close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
index 1f66e7c..a1f01ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
@@ -70,6 +70,9 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
     Collections.sort(this.families);
   }
       
+  public DeleteTracker getDeleteTracker() {
+      return coveringDelete;
+  }
   
   private ImmutableBytesPtr getNextFamily(ImmutableBytesPtr family) {
     int index = Collections.binarySearch(families, family);
@@ -209,7 +212,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
     }
   }
 
-  class DeleteTracker {
+  public static class DeleteTracker {
 
     public KeyValue deleteFamily;
     public KeyValue deleteColumn;
@@ -283,7 +286,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
           return true;
         }
         // clear the point delete since the TS must not be matching
-        coveringDelete.pointDelete = null;
+        pointDelete = null;
       }
       return false;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
index 884cca6..1c36ebb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
@@ -21,13 +21,20 @@ package org.apache.phoenix.hbase.index.scanner;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter.DeleteTracker;
+import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
 
 
 /**
  * {@link Scanner} that has no underlying data
  */
-public class EmptyScanner implements Scanner {
-
+public class EmptyScanner implements CoveredDeleteScanner {
+  private final DeleteTracker deleteTracker;
+  
+  public EmptyScanner (DeleteTracker deleteTracker) {
+      this.deleteTracker = deleteTracker;
+  }
+  
   @Override
   public Cell next() throws IOException {
     return null;
@@ -47,4 +54,9 @@ public class EmptyScanner implements Scanner {
   public void close() throws IOException {
     // noop
   }
+
+  @Override
+  public DeleteTracker getDeleteTracker() {
+    return deleteTracker;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index 301929c..5547958 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.covered.KeyValueStore;
 import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter;
+import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter.DeleteTracker;
 import org.apache.phoenix.hbase.index.covered.filter.ColumnTrackingNextLargestTimestampFilter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
@@ -57,7 +58,7 @@ public class ScannerBuilder {
     this.update = update;
   }
 
-  public Scanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts, boolean returnNullIfRowNotFound) {
+  public CoveredDeleteScanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts, boolean returnNullIfRowNotFound) {
 
     Filter columnFilters = getColumnFilters(indexedColumns);
     FilterList filters = new FilterList(Lists.newArrayList(columnFilters));
@@ -68,10 +69,11 @@ public class ScannerBuilder {
     filters.addFilter(new ColumnTrackingNextLargestTimestampFilter(ts, tracker));
 
     // filter out kvs based on deletes
-    filters.addFilter(new ApplyAndFilterDeletesFilter(getAllFamilies(indexedColumns)));
+    ApplyAndFilterDeletesFilter deleteFilter = new ApplyAndFilterDeletesFilter(getAllFamilies(indexedColumns));
+    filters.addFilter(deleteFilter);
 
     // combine the family filters and the rest of the filters as a
-    return getFilteredScanner(filters, returnNullIfRowNotFound);
+    return getFilteredScanner(filters, returnNullIfRowNotFound, deleteFilter.getDeleteTracker());
   }
 
   /**
@@ -108,14 +110,18 @@ public class ScannerBuilder {
     return families;
   }
 
-  private Scanner getFilteredScanner(Filter filters, boolean returnNullIfRowNotFound) {
+  public static interface CoveredDeleteScanner extends Scanner {
+      public DeleteTracker getDeleteTracker();
+  }
+  
+  private CoveredDeleteScanner getFilteredScanner(Filter filters, boolean returnNullIfRowNotFound, final DeleteTracker deleteTracker) {
     // create a scanner and wrap it as an iterator, meaning you can only go forward
     final FilteredKeyValueScanner kvScanner = new FilteredKeyValueScanner(filters, memstore);
     // seek the scanner to initialize it
     KeyValue start = KeyValueUtil.createFirstOnRow(update.getRow());
     try {
       if (!kvScanner.seek(start)) {
-        return returnNullIfRowNotFound ? null : new EmptyScanner();
+        return returnNullIfRowNotFound ? null : new EmptyScanner(deleteTracker);
       }
     } catch (IOException e) {
       // This should never happen - everything should explode if so.
@@ -124,7 +130,7 @@ public class ScannerBuilder {
     }
 
     // we have some info in the scanner, so wrap it in an iterator and return.
-    return new Scanner() {
+    return new CoveredDeleteScanner() {
 
       @Override
       public Cell next() {
@@ -162,6 +168,11 @@ public class ScannerBuilder {
       public void close() throws IOException {
         kvScanner.close();
       }
+
+      @Override
+      public DeleteTracker getDeleteTracker() {
+        return deleteTracker;
+      }
     };
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index a60adef..6582c8a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -32,7 +32,7 @@ import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException;
 import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.scanner.Scanner;
+import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
 
 /**
  * Utility class to help manage indexes
@@ -97,7 +97,7 @@ public class IndexManagementUtil {
 
     }
 
-    public static ValueGetter createGetterFromScanner(Scanner scanner, byte[] currentRow) {
+    public static ValueGetter createGetterFromScanner(CoveredDeleteScanner scanner, byte[] currentRow) {
         return scanner!=null ? new LazyValueGetter(scanner, currentRow) : null;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 3b4faa9..840d535 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -548,7 +548,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         initCachedState();
     }
     
-    public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey)  {
+    public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts)  {
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         boolean prependRegionStartKey = isLocalIndex && regionStartKey != null;
         boolean isIndexSalted = !isLocalIndex && nIndexSaltBuckets > 0;
@@ -620,7 +620,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 	dataColumnType = expression.getDataType();
                 	dataSortOrder = expression.getSortOrder();
                     isNullable = expression.isNullable();
-                	expression.evaluate(new ValueGetterTuple(valueGetter), ptr);
+                	expression.evaluate(new ValueGetterTuple(valueGetter, ts), ptr);
                 }
                 else {
                     Field field = dataRowKeySchema.getField(dataPkPosition[i]);
@@ -932,10 +932,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     }
     
     public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
-        byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey);
+        byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey, ts);
         Put put = null;
         // New row being inserted: add the empty key value
-        if (valueGetter==null || valueGetter.getLatestValue(dataEmptyKeyValueRef) == null) {
+        ImmutableBytesWritable latestValue = null;
+        if (valueGetter==null || (latestValue = valueGetter.getLatestValue(dataEmptyKeyValueRef, ts)) == null || latestValue == ValueGetter.HIDDEN_BY_DELETE) {
+            // We need to track whether or not our empty key value is hidden by a Delete Family marker at the same timestamp.
+            // If it is, these Puts will be masked so should not be emitted.
+            if (latestValue == ValueGetter.HIDDEN_BY_DELETE) {
+                return null;
+            }
             put = new Put(indexRowKey);
             // add the keyvalue for the empty row
             put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
@@ -997,7 +1003,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                         }
                     }, dataColRef.getFamily(), dataColRef.getQualifier(), encodingScheme);
                     ImmutableBytesPtr ptr = new ImmutableBytesPtr();
-                    expression.evaluate(new ValueGetterTuple(valueGetter), ptr);
+                    expression.evaluate(new ValueGetterTuple(valueGetter, ts), ptr);
                     byte[] value = ptr.copyBytesIfNecessary();
                     if (value != null) {
                         int indexArrayPos = encodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1;
@@ -1023,8 +1029,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
                 ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
                 ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
-                ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
-                if (value != null) {
+                ImmutableBytesWritable value = valueGetter.getLatestValue(ref, ts);
+                if (value != null && value != ValueGetter.HIDDEN_BY_DELETE) {
                     if (put == null) {
                         put = new Put(indexRowKey);
                         put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
@@ -1068,7 +1074,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         return getDeleteTypeOrNull(pendingUpdates) != null;
     }
     
-    private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<KeyValue> pendingUpdates) throws IOException {
+    private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<KeyValue> pendingUpdates, long ts) throws IOException {
         if (pendingUpdates.isEmpty()) {
             return false;
         }
@@ -1079,8 +1085,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         for (ColumnReference ref : indexedColumns) {
         	Cell newValue = newState.get(ref);
         	if (newValue != null) { // Indexed column has potentially changed
-        	    ImmutableBytesWritable oldValue = oldState.getLatestValue(ref);
-        		boolean newValueSetAsNull = (newValue.getTypeByte() == Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode() || CellUtil.matchingValue(newValue, HConstants.EMPTY_BYTE_ARRAY));
+        	    ImmutableBytesWritable oldValue = oldState.getLatestValue(ref, ts);
+                boolean newValueSetAsNull = (newValue.getTypeByte() == Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode() || CellUtil.matchingValue(newValue, HConstants.EMPTY_BYTE_ARRAY));
         		boolean oldValueSetAsNull = oldValue == null || oldValue.getLength() == 0;
         		//If the new column value has to be set as null and the older value is null too,
         		//then just skip to the next indexed column.
@@ -1109,10 +1115,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     }
     
     public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
-        byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey);
+        byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey, ts);
         // Delete the entire row if any of the indexed columns changed
         DeleteType deleteType = null;
-        if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
+        if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates, ts)) { // Deleting the entire row
             byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary();
             Delete delete = new Delete(indexRowKey);
             
@@ -1774,7 +1780,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         }
         return new ValueGetter() {
             @Override
-            public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
+            public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) {
                 if(ref.equals(dataEmptyKeyValueRef)) return null;
                 return valueMap.get(ref);
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 82360f7..c91e36e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -216,35 +216,36 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
                 minTimeStamp *= -1;
             }
             // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
-            HTableInterface systemTable = env.getTable(SchemaUtil
-                    .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
-            MetaDataMutationResult result = IndexUtil.setIndexDisableTimeStamp(indexTableName, minTimeStamp,
-                    systemTable, newState);
-            if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
-                LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations");
-                continue;
-            }
-            if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
-                if (leaveIndexActive) {
-                    LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = "
-                            + result.getMutationCode());
-                    // If we're not disabling the index, then we don't want to throw as throwing
-                    // will lead to the RS being shutdown.
-                    if (blockDataTableWritesOnFailure) {
-                        throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed.");
-                    }
-                } else {
-                    LOG.warn("Attempt to disable index " + indexTableName + " failed with code = "
-                            + result.getMutationCode() + ". Will use default failure policy instead.");
-                    throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed.");
-                } 
+            try (HTableInterface systemTable = env.getTable(SchemaUtil
+                    .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()))) {
+                MetaDataMutationResult result = IndexUtil.updateIndexState(indexTableName, minTimeStamp,
+                        systemTable, newState);
+                if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
+                    LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations");
+                    continue;
+                }
+                if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
+                    if (leaveIndexActive) {
+                        LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = "
+                                + result.getMutationCode());
+                        // If we're not disabling the index, then we don't want to throw as throwing
+                        // will lead to the RS being shutdown.
+                        if (blockDataTableWritesOnFailure) {
+                            throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed.");
+                        }
+                    } else {
+                        LOG.warn("Attempt to disable index " + indexTableName + " failed with code = "
+                                + result.getMutationCode() + ". Will use default failure policy instead.");
+                        throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed.");
+                    } 
+                }
+                if (leaveIndexActive)
+                    LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + " due to an exception while writing updates.",
+                            cause);
+                else
+                    LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.",
+                            cause);
             }
-            if (leaveIndexActive)
-                LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + " due to an exception while writing updates.",
-                        cause);
-            else
-                LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.",
-                        cause);
         }
         // Return the cell time stamp (note they should all be the same)
         return timestamp;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 43b5fe5..5444360 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -555,7 +555,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             ValueGetter getter = new ValueGetter() {
 
                 @Override
-                public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException {
+                public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
                     return valueMap.get(ref);
                 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/78f35a65/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
index 1f271f4..728b1e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
@@ -32,13 +32,17 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
  * Class used to construct a {@link Tuple} in order to evaluate an {@link Expression}
  */
 public class ValueGetterTuple extends BaseTuple {
-	private ValueGetter valueGetter;
+	private final ValueGetter valueGetter;
+	private final long ts;
     
-    public ValueGetterTuple(ValueGetter valueGetter) {
+    public ValueGetterTuple(ValueGetter valueGetter, long ts) {
         this.valueGetter = valueGetter;
+        this.ts = ts;
     }
     
     public ValueGetterTuple() {
+        this.valueGetter = null;
+        this.ts = HConstants.LATEST_TIMESTAMP;
     }
     
     @Override
@@ -55,7 +59,7 @@ public class ValueGetterTuple extends BaseTuple {
     public KeyValue getValue(byte[] family, byte[] qualifier) {
         ImmutableBytesWritable value = null;
         try {
-            value = valueGetter.getLatestValue(new ColumnReference(family, qualifier));
+            value = valueGetter.getLatestValue(new ColumnReference(family, qualifier), ts);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -92,7 +96,7 @@ public class ValueGetterTuple extends BaseTuple {
         KeyValue kv = getValue(family, qualifier);
         if (kv == null)
             return false;
-        ptr.set(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+        ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
         return true;
     }
 


Mime
View raw message