phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject phoenix git commit: PHOENIX-4109 Ensure mutations are processed in batches with same time stamp during partial rebuild
Date Mon, 21 Aug 2017 18:10:28 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 b4d21f991 -> e51e590ef


PHOENIX-4109 Ensure mutations are processed in batches with same time stamp during partial
rebuild


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e51e590e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e51e590e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e51e590e

Branch: refs/heads/4.x-HBase-0.98
Commit: e51e590efee5be0c2a1a73fb914b8267519b9b87
Parents: b4d21f9
Author: James Taylor <jamestaylor@apache.org>
Authored: Sun Aug 20 16:55:48 2017 -0700
Committer: James Taylor <jamestaylor@apache.org>
Committed: Sun Aug 20 16:55:48 2017 -0700

----------------------------------------------------------------------
 .../end2end/index/PartialIndexRebuilderIT.java  | 174 ++++++++++++++++---
 .../org/apache/phoenix/hbase/index/Indexer.java |   2 +-
 .../hbase/index/covered/NonTxIndexBuilder.java  |   4 +-
 .../org/apache/phoenix/util/MetaDataUtil.java   |   2 +-
 4 files changed, 156 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e51e590e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index 0dc9a35..f42d2c9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -18,9 +18,9 @@
 package org.apache.phoenix.end2end.index;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -29,8 +29,8 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -38,13 +38,10 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PTable;
@@ -54,6 +51,7 @@ import org.apache.phoenix.util.EnvironmentEdge;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexScrutiny;
 import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.Repeat;
 import org.apache.phoenix.util.RunUntilFailure;
@@ -131,7 +129,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
         }
     }
     @Test
-    @Repeat(3)
+    @Repeat(5)
     public void testConcurrentUpsertsWithRebuild() throws Throwable {
         int nThreads = 5;
         final int batchSize = 200;
@@ -145,7 +143,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
         Connection conn = DriverManager.getConnection(getUrl());
         HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
         conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k1 INTEGER NOT
NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) STORE_NULLS=true,
VERSIONS=1");
-        //addDelayingCoprocessor(conn, tableName);
         conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName
+ "(v1)");
         
         final CountDownLatch doneSignal1 = new CountDownLatch(nThreads);
@@ -177,6 +174,16 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
         return false;
     }
     
+    private static boolean hasDisabledIndex(PMetaData metaCache, PTableKey key) throws TableNotFoundException
{
+        PTable table = metaCache.getTableRef(key).getTable();
+        for (PTable index : table.getIndexes()) {
+            if (index.getIndexState() == PIndexState.DISABLE) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private static boolean mutateRandomly(Connection conn, String fullTableName, int nRows,
boolean checkForInactive, String fullIndexName) throws SQLException, InterruptedException
{
         PTableKey key = new PTableKey(null,fullTableName);
         PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache();
@@ -238,7 +245,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
     }
     
     @Test
-    @Repeat(3)
+    @Repeat(5)
     public void testDeleteAndUpsertAfterFailure() throws Throwable {
         final int nRows = 10;
         String schemaName = generateUniqueName();
@@ -261,19 +268,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
        }
     }
     
-    private static void addDelayingCoprocessor(Connection conn, String tableName) throws
SQLException, IOException {
-        int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
-        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
-        HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
-        descriptor.addCoprocessor(DelayingRegionObserver.class.getName(), null, priority,
null);
-        HBaseAdmin admin = services.getAdmin();
-        try {
-            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
-        } finally {
-            admin.close();
-        }
-    }
-    
     @Test
     public void testWriteWhileRebuilding() throws Throwable {
         final int nRows = 10;
@@ -498,6 +492,110 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
     }
     
     @Test
+    public void testSeparateTimeBatchesRequired() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        PTableKey key = new PTableKey(null,fullTableName);
+        final MyClock clock = new MyClock(1000);
+        EnvironmentEdgeManager.injectEdge(clock);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR
PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            clock.time += 100;
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName
+ " (v1, v2)");
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')");
+            conn.commit();
+            clock.time += 100;
+            HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE);
+            clock.time += 100;
+            long disableTime = clock.currentTime();
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb',
'11')");
+            conn.commit();
+            clock.time += 100;
+            assertTrue(hasDisabledIndex(metaCache, key));
+            assertEquals(2,TestUtil.getRowCount(conn, fullTableName));
+            assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','0')");
+            conn.commit();
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')");
+            conn.commit();
+            clock.time += 100;
+            IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
+            clock.time += 100;
+            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            clock.time += 100;
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+        } finally {
+            EnvironmentEdgeManager.injectEdge(null);
+        }
+    }
+    
+    @Test
+    public void testMultiValuesWhenDisableAndInactive() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        PTableKey key = new PTableKey(null,fullTableName);
+        final MyClock clock = new MyClock(1000);
+        EnvironmentEdgeManager.injectEdge(clock);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR
PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            clock.time += 100;
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName
+ " (v1, v2) INCLUDE (v3)");
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0','x')");
+            conn.commit();
+            clock.time += 100;
+            try (HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES))
{
+                // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index
rebuilder from triggering
+                IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE);
+                clock.time += 100;
+                long disableTime = clock.currentTime();
+                // Set some values while index disabled
+                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb',
'11','yy')");
+                conn.commit();
+                clock.time += 100;
+                assertTrue(hasDisabledIndex(metaCache, key));
+                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','222','zzz')");
+                conn.commit();
+                clock.time += 100;
+                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd','3333','zzzz')");
+                conn.commit();
+                clock.time += 100;
+                // Will cause partial index rebuilder to be triggered
+                IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
+            }
+            final CountDownLatch doneSignal = new CountDownLatch(1);
+            advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal);
+            doneSignal.await(30, TimeUnit.SECONDS);
+            // Set some values while index is in INACTIVE state
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')");
+            conn.commit();
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')");
+            conn.commit();
+            clock.time += WAIT_AFTER_DISABLED;
+            // Enough time has passed, so rebuild will start now
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            clock.time += 100;
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+        } finally {
+            EnvironmentEdgeManager.injectEdge(null);
+        }
+    }
+
+    @Test
     public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable {
         String schemaName = generateUniqueName();
         String tableName = generateUniqueName();
@@ -563,7 +661,37 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
         }
     }
 
-    private static void advanceClockUntilPartialRebuildStarts(final String fullIndexName,
final MyClock clock) {
+    @Test
+    public void testRegionsOnlineCheck() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        PTableKey key = new PTableKey(null,fullTableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR
PRIMARY KEY)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a')");
+            conn.commit();
+            Configuration conf = conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
+            PTable table = metaCache.getTableRef(key).getTable();
+            assertTrue(MetaDataUtil.tableRegionsOnline(conf, table));
+            try (HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin())
{
+                admin.disableTable(fullTableName);
+                assertFalse(MetaDataUtil.tableRegionsOnline(conf, table));
+                admin.enableTable(fullTableName);
+            }
+            assertTrue(MetaDataUtil.tableRegionsOnline(conf, table));
+        }
+    }
+    
+    private static void advanceClockUntilPartialRebuildStarts(final String fullIndexName,
final MyClock clock) throws InterruptedException {
+        final CountDownLatch doneSignal = new CountDownLatch(1);
+        advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal);
+        clock.time += WAIT_AFTER_DISABLED + 1000;
+        doneSignal.await(30, TimeUnit.SECONDS);
+    }
+    
+    private static void advanceClockUntilPartialRebuildStarts(final String fullIndexName,
final MyClock clock, final CountDownLatch doneSignal) {
         Runnable r = new Runnable() {
             @Override
             public void run() {
@@ -573,7 +701,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT
{
                         Thread.sleep(1000);
                         clock.time += 1000;
                     }
-                    clock.time += WAIT_AFTER_DISABLED + 1000;
+                    doneSignal.countDown();
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e51e590e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 35dbe08..18c5dbe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -524,7 +524,7 @@ public class Indexer extends BaseRegionObserver {
           miniBatchOp.setWalEdit(0, edit);
       }
   
-      if (copyMutations) {
+      if (copyMutations || replayWrite != null) {
           mutations = IndexManagementUtil.flattenMutationsByTimestamp(mutations);
       }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e51e590e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index c013b5b..50e2c3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -93,7 +93,9 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
             List<KeyValue> kvs = KeyValueUtil.ensureKeyValues(family);
             for (KeyValue kv : kvs) {
                 batch.add(kv);
-                assert(ts == kv.getTimestamp());
+                if(ts != kv.getTimestamp()) {
+                    throw new IllegalStateException("Time stamps must match for all cells
in a batch");
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e51e590e/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index ca13281..ca2ae6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -533,7 +533,7 @@ public class MetaDataUtil {
         try {
             hcon = HConnectionManager.getConnection(conf);
             List<HRegionLocation> locations = hcon.locateRegions(
-                org.apache.hadoop.hbase.TableName.valueOf(table.getTableName().getBytes()));
+                org.apache.hadoop.hbase.TableName.valueOf(table.getPhysicalName().getBytes()));
 
             for (HRegionLocation loc : locations) {
                 try {


Mime
View raw message