phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [1/2] phoenix git commit: PHOENIX-4105 Fix tests broken due to PHOENIX-4089 (addendum 1)
Date Sun, 20 Aug 2017 23:53:37 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 1d5f31cf6 -> b4d21f991


PHOENIX-4105 Fix tests broken due to PHOENIX-4089 (addendum 1)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c0243f59
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c0243f59
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c0243f59

Branch: refs/heads/4.x-HBase-0.98
Commit: c0243f59e05800499a3de8feeeacedc1549644e6
Parents: 1d5f31c
Author: James Taylor <jamestaylor@apache.org>
Authored: Sat Aug 19 14:13:30 2017 -0700
Committer: James Taylor <jamestaylor@apache.org>
Committed: Sun Aug 20 16:48:58 2017 -0700

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    |   7 +-
 .../end2end/index/PartialIndexRebuilderIT.java  | 152 ++++++++++++++++---
 .../coprocessor/MetaDataRegionObserver.java     |   7 +-
 .../java/org/apache/phoenix/util/TestUtil.java  |  63 +++++++-
 4 files changed, 198 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c0243f59/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index bc5237a..f1be8b5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -288,7 +288,7 @@ public class MutableIndexFailureIT extends BaseTest {
                 FailingRegionObserver.FAIL_WRITE = false;
                 if (rebuildIndexOnWriteFailure) {
                     // wait for index to be rebuilt automatically
-                    waitForIndexRebuild(conn,indexName, PIndexState.ACTIVE);
+                    waitForIndexRebuild(conn,fullIndexName, PIndexState.ACTIVE);
                 } else {
                     // simulate replaying failed mutation
                     replayMutations();
@@ -308,7 +308,7 @@ public class MutableIndexFailureIT extends BaseTest {
                 // Wait for index to be rebuilt automatically. This should fail because
                 // we haven't flipped the FAIL_WRITE flag to false and as a result this
                 // should cause index rebuild to fail too.
-                waitForIndexRebuild(conn, indexName, PIndexState.DISABLE);
+                waitForIndexRebuild(conn, fullIndexName, PIndexState.DISABLE);
                 // verify that the index was marked as disabled and the index disable
                 // timestamp set to 0
                 String q =
@@ -326,9 +326,8 @@ public class MutableIndexFailureIT extends BaseTest {
         }
     }
 
-    private void waitForIndexRebuild(Connection conn, String index, PIndexState expectedIndexState)
throws InterruptedException, SQLException {
+    private void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState)
throws InterruptedException, SQLException {
         if (!transactional) {
-            String fullIndexName = SchemaUtil.getTableName(schema, index);
             TestUtil.waitForIndexRebuild(conn, fullIndexName, expectedIndexState);
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c0243f59/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index 7bd3e11..0dc9a35 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end.index;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -28,12 +29,22 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PTable;
@@ -57,18 +68,101 @@ import com.google.common.collect.Maps;
 @RunWith(RunUntilFailure.class)
 public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
     private static final Random RAND = new Random(5);
-    private static final int WAIT_AFTER_DISABLED = 10000;
+    private static final int WAIT_AFTER_DISABLED = 5000;
 
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString());
-        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "2000");
         serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "120000");
// give up rebuilding after 2 minutes
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB,
Long.toString(WAIT_AFTER_DISABLED));
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
     }
 
+    private static void mutateRandomly(final String fullTableName, final int nThreads, final
int nRows, final int nIndexValues, final int batchSize, final CountDownLatch doneSignal) {
+        Runnable[] runnables = new Runnable[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+           runnables[i] = new Runnable() {
+    
+               @Override
+               public void run() {
+                   try {
+                       Connection conn = DriverManager.getConnection(getUrl());
+                       for (int i = 0; i < 3000; i++) {
+                           boolean isNull = RAND.nextBoolean();
+                           int randInt = RAND.nextInt() % nIndexValues;
+                           int pk = Math.abs(RAND.nextInt()) % nRows;
+                           conn.createStatement().execute("UPSERT INTO " + fullTableName
+ " VALUES (" + pk + ", 0, " + (isNull ? null : randInt) + ")");
+                           if ((i % batchSize) == 0) {
+                               conn.commit();
+                           }
+                       }
+                       conn.commit();
+                       for (int i = 0; i < 3000; i++) {
+                           int pk = Math.abs(RAND.nextInt()) % nRows;
+                           conn.createStatement().execute("DELETE FROM " + fullTableName
+ " WHERE k1= " + pk + " AND k2=0");
+                           if (i % batchSize == 0) {
+                               conn.commit();
+                           }
+                       }
+                       conn.commit();
+                       for (int i = 0; i < 3000; i++) {
+                           int randInt = RAND.nextInt() % nIndexValues;
+                           int pk = Math.abs(RAND.nextInt()) % nRows;
+                           conn.createStatement().execute("UPSERT INTO " + fullTableName
+ " VALUES (" + pk + ", 0, " + randInt + ")");
+                           if ((i % batchSize) == 0) {
+                               conn.commit();
+                           }
+                       }
+                       conn.commit();
+                   } catch (SQLException e) {
+                       throw new RuntimeException(e);
+                   } finally {
+                       doneSignal.countDown();
+                   }
+               }
+                
+            };
+        }
+        for (int i = 0; i < nThreads; i++) {
+            Thread t = new Thread(runnables[i]);
+            t.start();
+        }
+    }
+    @Test
+    @Repeat(3)
+    public void testConcurrentUpsertsWithRebuild() throws Throwable {
+        int nThreads = 5;
+        final int batchSize = 200;
+        final int nRows = 51;
+        final int nIndexValues = 23;
+        final String schemaName = "";
+        final String tableName = generateUniqueName();
+        final String indexName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        Connection conn = DriverManager.getConnection(getUrl());
+        HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+        conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k1 INTEGER NOT
NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) STORE_NULLS=true,
VERSIONS=1");
+        //addDelayingCoprocessor(conn, tableName);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName
+ "(v1)");
+        
+        final CountDownLatch doneSignal1 = new CountDownLatch(nThreads);
+        mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, batchSize, doneSignal1);
+        assertTrue("Ran out of time", doneSignal1.await(120, TimeUnit.SECONDS));
+        
+        IndexUtil.updateIndexState(fullIndexName, EnvironmentEdgeManager.currentTimeMillis(),
metaTable, PIndexState.DISABLE);
+        do {
+            final CountDownLatch doneSignal2 = new CountDownLatch(nThreads);
+            mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, batchSize, doneSignal2);
+            assertTrue("Ran out of time", doneSignal2.await(500, TimeUnit.SECONDS));
+        } while (!TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
+        
+        long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+        assertEquals(nRows, actualRowCount);
+    }
+
     private static boolean mutateRandomly(Connection conn, String fullTableName, int nRows)
throws Exception {
         return mutateRandomly(conn, fullTableName, nRows, false, null);
     }
@@ -144,7 +238,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
     }
     
     @Test
-    @Repeat(10)
+    @Repeat(3)
     public void testDeleteAndUpsertAfterFailure() throws Throwable {
         final int nRows = 10;
         String schemaName = generateUniqueName();
@@ -160,13 +254,26 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
             HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
             IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
             mutateRandomly(conn, fullTableName, nRows);
-            TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
             
             long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
             assertEquals(nRows,actualRowCount);
        }
     }
     
+    private static void addDelayingCoprocessor(Connection conn, String tableName) throws
SQLException, IOException {
+        int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
+        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
+        descriptor.addCoprocessor(DelayingRegionObserver.class.getName(), null, priority,
null);
+        HBaseAdmin admin = services.getAdmin();
+        try {
+            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+        } finally {
+            admin.close();
+        }
+    }
+    
     @Test
     public void testWriteWhileRebuilding() throws Throwable {
         final int nRows = 10;
@@ -201,12 +308,15 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
             t.start();
             long disableTS = EnvironmentEdgeManager.currentTimeMillis();
             IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
             doneSignal.await(60, TimeUnit.SECONDS);
-            TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
             assertTrue(hasInactiveIndex[0]);
             
+            TestUtil.dumpIndexStatus(conn, fullIndexName);
+
             long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
             assertEquals(nRows,actualRowCount);
+            
        }
     }
 
@@ -231,7 +341,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
             conn.commit();
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee')");
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
@@ -258,7 +368,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
             conn.commit();
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
@@ -285,7 +395,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
             conn.commit();
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)");
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
@@ -310,7 +420,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
             conn.commit();
             conn.createStatement().execute("DELETE FROM " + fullTableName);
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
        }
@@ -335,7 +445,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
             IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','b')");
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
@@ -379,11 +489,9 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
             conn.commit();
             clock.time += 1000;
             advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
             clock.time += 100;
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
-            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
-            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
         }
@@ -414,7 +522,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
             conn.commit();
             clock.time += 1000;
             advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
             clock.time += 100;
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
@@ -447,10 +555,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
             conn.commit();
             clock.time += 1000;
             advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
             clock.time += 100;
-            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
-            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
@@ -477,4 +583,16 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
         t.setDaemon(true);
         t.start();
     }
+    
+    public static class DelayingRegionObserver extends SimpleRegionObserver {
+        @Override
+        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+            try {
+                Thread.sleep(Math.abs(RAND.nextInt()) % 10);
+            } catch (InterruptedException e) {
+            }
+            
+        }
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c0243f59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 350f4cd..4bac38d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -459,8 +459,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
 										indexPTable.getSchemaName().getString(),
 										indexPTable.getTableName().getString());
 								if (scanEndTime == latestUpperBoundTimestamp) {
-								    // We compare the absolute value of the index disable timestamp. We don't want
to
-								    // pass a negative value because that means an additional index write failed.
+								    // Finished building. Pass in the expected value for the index disabled timestamp
+								    // and only set to active if it hasn't changed (i.e. a write failed again, before
+								    // we're done). We take the absolute value because if the option to leave the
+								    // index active upon failure is used, we'll see a negative value when a write
+								    // fails.
 									IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.ACTIVE, Math.abs(indexPTable.getIndexDisableTimestamp()));
 									batchExecutedPerTableMap.remove(dataPTable.getName());
                                     LOG.info("Making Index:" + indexPTable.getTableName()
+ " active after rebuilding");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c0243f59/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index af228b0..6afc796 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -121,6 +121,7 @@ import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
 
@@ -128,6 +129,7 @@ import com.google.common.collect.Lists;
 public class TestUtil {
     private static final Log LOG = LogFactory.getLog(TestUtil.class);
     
+    private static final Long ZERO = new Long(0);
     public static final String DEFAULT_SCHEMA_NAME = "";
     public static final String DEFAULT_DATA_TABLE_NAME = "T";
     public static final String DEFAULT_INDEX_TABLE_NAME = "I";
@@ -855,34 +857,79 @@ public class TestUtil {
         System.out.println("-----------------------------------------------");
     }
 
+    public static void dumpIndexStatus(Connection conn, String indexName) throws IOException,
SQLException {
+        try (HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES))
{ 
+            System.out.println("************ dumping index status for " + indexName + " **************");
+            Scan s = new Scan();
+            s.setRaw(true);
+            s.setMaxVersions();
+            byte[] startRow = SchemaUtil.getTableKeyFromFullName(indexName);
+            s.setStartRow(startRow);
+            s.setStopRow(ByteUtil.nextKey(ByteUtil.concat(startRow, QueryConstants.SEPARATOR_BYTE_ARRAY)));
+            try (ResultScanner scanner = table.getScanner(s)) {
+                Result result = null;
+                while ((result = scanner.next()) != null) {
+                    CellScanner cellScanner = result.cellScanner();
+                    Cell current = null;
+                    while (cellScanner.advance()) {
+                        current = cellScanner.current();
+                        if (Bytes.compareTo(current.getQualifierArray(), current.getQualifierOffset(),
current.getQualifierLength(), PhoenixDatabaseMetaData.INDEX_STATE_BYTES, 0, PhoenixDatabaseMetaData.INDEX_STATE_BYTES.length)
== 0) {
+                            System.out.println(current.getTimestamp() + "/INDEX_STATE=" +
PIndexState.fromSerializedValue(current.getValueArray()[current.getValueOffset()]));
+                        }
+                    }
+                }
+            }
+            System.out.println("-----------------------------------------------");
+    }
+    }
+
     public static void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState
indexState) throws InterruptedException, SQLException {
         waitForIndexState(conn, fullIndexName, indexState, 0L);
     }
     
+    private enum IndexStateCheck {SUCCESS, FAIL, KEEP_TRYING};
     public static void waitForIndexState(Connection conn, String fullIndexName, PIndexState
expectedIndexState, Long expectedIndexDisableTimestamp) throws InterruptedException, SQLException
{
         int maxTries = 300, nTries = 0;
         do {
             Thread.sleep(1000); // sleep 1 sec
-            if (checkIndexState(conn, fullIndexName, expectedIndexState, expectedIndexDisableTimestamp))
{
-                return;
+            IndexStateCheck state = checkIndexStateInternal(conn, fullIndexName, expectedIndexState,
expectedIndexDisableTimestamp);
+            if (state != IndexStateCheck.KEEP_TRYING) {
+                if (state == IndexStateCheck.SUCCESS) {
+                    return;
+                }
+                fail("Index state will not become " + expectedIndexState);
             }
         } while (++nTries < maxTries);
         fail("Ran out of time waiting for index state to become " + expectedIndexState);
     }
 
-    public static boolean checkIndexState(Connection conn, String fullIndexName, PIndexState
expectedIndexState, Long expectedIndexDisableTimestamp) throws InterruptedException, SQLException
{
+    public static boolean checkIndexState(Connection conn, String fullIndexName, PIndexState
expectedIndexState, Long expectedIndexDisableTimestamp) throws SQLException {
+        return checkIndexStateInternal(conn,fullIndexName, expectedIndexState, expectedIndexDisableTimestamp)
== IndexStateCheck.SUCCESS;
+    }
+    private static IndexStateCheck checkIndexStateInternal(Connection conn, String fullIndexName,
PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws SQLException {
         String schema = SchemaUtil.getSchemaNameFromFullName(fullIndexName);
         String index = SchemaUtil.getTableNameFromFullName(fullIndexName);
-        String query = "SELECT CAST(" + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP +
" AS BIGINT) FROM " +
+        String query = "SELECT CAST(" + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP +
" AS BIGINT)," + PhoenixDatabaseMetaData.INDEX_STATE + " FROM " +
                 PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM
+ "," + PhoenixDatabaseMetaData.TABLE_NAME
                 + ") = (" + "'" + schema + "','" + index + "') "
-                + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME
+ " IS NULL"
-                + " AND " + PhoenixDatabaseMetaData.INDEX_STATE + " = '" + expectedIndexState.getSerializedValue()
+ "'";
+                + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME
+ " IS NULL";
         ResultSet rs = conn.createStatement().executeQuery(query);
         if (rs.next()) {
-            return expectedIndexDisableTimestamp == null || (rs.getLong(1) == 0 &&
!rs.wasNull());
+            Long actualIndexDisableTimestamp = rs.getLong(1);
+            if (rs.wasNull()) {
+                actualIndexDisableTimestamp = null;
+            }
+            PIndexState actualIndexState = PIndexState.fromSerializedValue(rs.getString(2));
+            boolean matchesExpected = Objects.equal(actualIndexDisableTimestamp, expectedIndexDisableTimestamp)
&&
+                    actualIndexState == expectedIndexState;
+            if (matchesExpected) {
+                return IndexStateCheck.SUCCESS;
+            }
+            if (ZERO.equals(actualIndexDisableTimestamp)) {
+                return IndexStateCheck.FAIL;
+            }
         }
-        return false;
+        return IndexStateCheck.KEEP_TRYING;
     }
 
     public static long getRowCount(Connection conn, String tableName) throws SQLException
{


Mime
View raw message