phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cheng...@apache.org
Subject phoenix git commit: PHOENIX-4094 ParallelWriterIndexCommitter incorrectly applys local updates to index tables for 4.x-HBase-0.98
Date Fri, 18 Aug 2017 03:17:42 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 1e777b2b8 -> 2a38617a0


PHOENIX-4094 ParallelWriterIndexCommitter incorrectly applys local updates to index tables
for 4.x-HBase-0.98


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2a38617a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2a38617a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2a38617a

Branch: refs/heads/4.x-HBase-0.98
Commit: 2a38617a0b80e9e52ddc3cd9661894013bc822b9
Parents: 1e777b2
Author: chenglei <chenglei@apache.org>
Authored: Fri Aug 18 11:16:37 2017 +0800
Committer: chenglei <chenglei@apache.org>
Committed: Fri Aug 18 11:16:37 2017 +0800

----------------------------------------------------------------------
 .../wal/WALRecoveryRegionPostOpenIT.java        | 311 +++++++++++++++++++
 .../write/ParallelWriterIndexCommitter.java     |   5 +-
 2 files changed, 315 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a38617a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java
b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java
new file mode 100644
index 0000000..2fe8708
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java
@@ -0,0 +1,311 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
+import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class WALRecoveryRegionPostOpenIT extends BaseTest {
+
+    private static final Log LOG = LogFactory.getLog(WALRecoveryRegionPostOpenIT.class);
+
+    private static final String DATA_TABLE_NAME="DATA_POST_OPEN";
+
+    private static final String INDEX_TABLE_NAME="INDEX_POST_OPEN";
+
+    private static final long ONE_SEC = 1000;
+    private static final long ONE_MIN = 60 * ONE_SEC;
+    private static final long TIMEOUT = ONE_MIN;
+
+    private static volatile CountDownLatch handleFailureCountDownLatch= null;
+
+    private static volatile Multimap<HTableInterfaceReference, Mutation> tableReferenceToMutation=null;
+
+    private static volatile int handleFailureCalledCount=0;
+
+    private static volatile boolean failIndexTableWrite=false;
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
+        serverProps.put("hbase.coprocessor.region.classes", IndexTableFailingRegionObserver.class.getName());
+        serverProps.put(Indexer.RecoveryFailurePolicyKeyForTesting, ReleaseLatchOnFailurePolicy.class.getName());
+        serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER, "2");
+        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
+        serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_PAUSE, "5000");
+        serverProps.put("data.tx.snapshot.dir", "/tmp");
+        serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.FALSE.toString());
+        Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED,
Boolean.FALSE.toString());
+        NUM_SLAVES_BASE = 2;
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+
+    public static class IndexTableFailingRegionObserver extends SimpleRegionObserver {
+
+        @Override
+        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> observerContext,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+
+            if (observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString().contains(INDEX_TABLE_NAME)
&& failIndexTableWrite) {
+                throw new DoNotRetryIOException();
+            }
+            Mutation operation = miniBatchOp.getOperation(0);
+            Set<byte[]> keySet = operation.getFamilyMap().keySet();
+            for(byte[] family: keySet) {
+                if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)
&& failIndexTableWrite) {
+                    throw new DoNotRetryIOException();
+                }
+            }
+            super.preBatchMutate(observerContext, miniBatchOp);
+        }
+    }
+
+
+    public static class ReleaseLatchOnFailurePolicy extends StoreFailuresInCachePolicy {
+
+        public ReleaseLatchOnFailurePolicy(PerRegionIndexWriteCache failedIndexEdits) {
+            super(failedIndexEdits);
+        }
+
+        @Override
+        public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted,
Exception cause) throws IOException
+        {
+            LOG.info("Found index update failure!");
+            handleFailureCalledCount++;
+            tableReferenceToMutation=attempted;
+            LOG.info("failed index update on WAL recovery - allowing index table can be write.");
+            failIndexTableWrite=false;
+            super.handleFailure(attempted, cause);
+
+            if(handleFailureCountDownLatch!=null) {
+                handleFailureCountDownLatch.countDown();
+            }
+         }
+    }
+
+    @Test
+    public void testRecoveryRegionPostOpen() throws Exception {
+        handleFailureCountDownLatch= null ;
+        tableReferenceToMutation=null;
+        handleFailureCalledCount=0;
+        failIndexTableWrite=false;
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+
+        try (Connection conn = driver.connect(url, props)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().execute("CREATE TABLE " + DATA_TABLE_NAME
+                    + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) ");
+
+
+            conn.createStatement().execute(
+                    "CREATE " +  "INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME +
" (v1) INCLUDE (v2)");
+            String query = "SELECT * FROM " + DATA_TABLE_NAME;
+            ResultSet resultSet = conn.createStatement().executeQuery(query);
+            assertFalse(resultSet.next());
+
+            MiniHBaseCluster miniHBaseCluster = getUtility().getMiniHBaseCluster();
+            this.moveIndexTableRegionIfSameRegionSErver(miniHBaseCluster);
+            this.assertRegionServerDifferent(miniHBaseCluster);
+
+            //load one row into the table
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_NAME
+ " VALUES(?,?,?)");
+            stmt.setString(1, "a");
+            stmt.setString(2, "x");
+            stmt.setString(3, "1");
+            stmt.execute();
+
+            this.assertRegionServerDifferent(miniHBaseCluster);
+
+            Scan scan = new Scan();
+            HTable primaryTable = new HTable(getUtility().getConfiguration(), DATA_TABLE_NAME);
+            ResultScanner resultScanner = primaryTable.getScanner(scan);
+            int count = 0;
+             for (Result result : resultScanner) {
+                 count++;
+             }
+             assertEquals("Got an unexpected found of data rows", 1, count);
+
+            // begin to kill data table regionServer,and data table region would move to
the other regionSever,
+            // and then recover data table's WAL
+            handleFailureCountDownLatch=new CountDownLatch(1);
+            failIndexTableWrite=true;
+
+            ServerName dataTableRegionServerName=this.getRegionServerName(miniHBaseCluster,
DATA_TABLE_NAME);
+
+            miniHBaseCluster.killRegionServer(dataTableRegionServerName);
+            miniHBaseCluster.waitForRegionServerToStop(dataTableRegionServerName, TIMEOUT);
+
+            //there are only one regionServer now.
+            assertEquals("miniHBaseCluster.getLiveRegionServerThreads()", miniHBaseCluster.getLiveRegionServerThreads().size(),1);
+            HRegionServer liveRegionServer=miniHBaseCluster.getLiveRegionServerThreads().get(0).getRegionServer();
+
+            //verify handleFailure is called.
+            handleFailureCountDownLatch.await();
+            assertTrue(handleFailureCalledCount==1);
+            Map<HTableInterfaceReference, Collection<Mutation>> tableReferenceToMutations=tableReferenceToMutation.asMap();
+            assertEquals("tableReferenceToMutation.size()", 1, tableReferenceToMutations.size());
+            Iterator<Map.Entry<HTableInterfaceReference, Collection<Mutation>>>
iter=tableReferenceToMutations.entrySet().iterator();
+            assertTrue(iter.hasNext());
+            Map.Entry<HTableInterfaceReference, Collection<Mutation>> entry=iter.next();
+            assertTrue(entry.getKey().getTableName().equals(INDEX_TABLE_NAME));
+            Mutation[] mutations=entry.getValue().toArray(new Mutation[0]);
+            assertEquals("mutations size "+mutations[0], 1, mutations.length);
+            assertTrue(mutations[0] instanceof Put);
+            assertTrue(!Arrays.equals(mutations[0].getRow(),Bytes.toBytes("a")));
+
+            //wait for data table region repoen.
+            List<HRegion> dataTableRegions=null;
+
+            for(int i=1;i<=200;i++) {
+                dataTableRegions=liveRegionServer.getOnlineRegions(TableName.valueOf(DATA_TABLE_NAME));
+                if(dataTableRegions.size() > 0) {
+                    break;
+                }
+                Thread.sleep(ONE_SEC);
+            }
+
+            dataTableRegions=liveRegionServer.getOnlineRegions(TableName.valueOf(DATA_TABLE_NAME));
+            assertTrue(dataTableRegions.size()==1);
+
+
+            // the index table is one row
+            HTable indexTable = new HTable(getUtility().getConfiguration(), INDEX_TABLE_NAME);
+            resultScanner = indexTable.getScanner(scan);
+            count = 0;
+            for (Result result : resultScanner) {
+                count++;
+            }
+            assertEquals("Got an unexpected found of index rows", 1, count);
+            resultScanner.close();
+            indexTable.close();
+
+            scan = new Scan();
+            primaryTable.close();
+            primaryTable = new HTable(getUtility().getConfiguration(), DATA_TABLE_NAME);
+            primaryTable.getConnection().clearRegionCache();
+            resultScanner = primaryTable.getScanner(scan);
+            count = 0;
+            for (Result result : resultScanner) {
+                LOG.info("Got data table result:" + result);
+                count++;
+            }
+            assertEquals("Got an unexpected found of data rows", 1, count);
+
+            // cleanup
+            primaryTable.close();
+        }
+    }
+
+    private ServerName getRegionServerName(MiniHBaseCluster miniHBaseCluster,String tableName)
throws IOException {
+        List<HRegion> regions = miniHBaseCluster.getRegions(Bytes.toBytes(tableName));
+        assertEquals(1, regions.size());
+        HRegion region=regions.get(0);
+        return miniHBaseCluster.getServerHoldingRegion(region.getRegionName());
+    }
+
+    private void assertRegionServerDifferent(MiniHBaseCluster miniHBaseCluster) throws IOException
{
+        ServerName dataTableRegionServerName=
+                this.getRegionServerName(miniHBaseCluster, DATA_TABLE_NAME);
+        ServerName indexTableRegionServerName=
+                this.getRegionServerName(miniHBaseCluster, INDEX_TABLE_NAME);
+        assertTrue(!dataTableRegionServerName.equals(indexTableRegionServerName));
+    }
+
+    private void moveIndexTableRegionIfSameRegionSErver(MiniHBaseCluster miniHBaseCluster)
throws IOException, InterruptedException {
+        List<HRegion> dataTableRegions = miniHBaseCluster.getRegions(Bytes.toBytes(DATA_TABLE_NAME));
+        assertEquals(1, dataTableRegions.size());
+        List<HRegion> indexTableRegions = miniHBaseCluster.getRegions(Bytes.toBytes(INDEX_TABLE_NAME));
+        assertEquals(1, indexTableRegions.size());
+
+        HRegion dataTableRegion=dataTableRegions.get(0);
+        HRegion indexTableRegion=indexTableRegions.get(0);
+        int dataTableRegionServerIndex = miniHBaseCluster.getServerWith(dataTableRegion.getRegionName());
+        int indexTableRegionServerIndex=miniHBaseCluster.getServerWith(indexTableRegion.getRegionName());
+        if(dataTableRegionServerIndex != indexTableRegionServerIndex) {
+            return;
+        }
+
+
+        int newRegionServerIndex=0;
+        while(newRegionServerIndex == indexTableRegionServerIndex) {
+            newRegionServerIndex++;
+        }
+
+        HRegionServer newRegionServer = miniHBaseCluster.getRegionServer(newRegionServerIndex);
+        this.moveRegionAndWait(miniHBaseCluster,indexTableRegion, newRegionServer);
+    }
+
+
+    private void moveRegionAndWait(MiniHBaseCluster miniHBaseCluster,HRegion destRegion,
HRegionServer destRegionServer) throws IOException, InterruptedException {
+        HMaster master = miniHBaseCluster.getMaster();
+        getUtility().getHBaseAdmin().move(
+                destRegion.getRegionInfo().getEncodedNameAsBytes(),
+                Bytes.toBytes(destRegionServer.getServerName().getServerName()));
+        while (true) {
+            ServerName currentRegionServerName =
+                    master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo());
+            if (currentRegionServerName != null && currentRegionServerName.equals(destRegionServer.getServerName()))
{
+                getUtility().assertRegionOnServer(
+                        destRegion.getRegionInfo(), currentRegionServerName, 200);
+                break;
+            }
+            Thread.sleep(10);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a38617a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index 6f40f5b..59a7bd6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -148,7 +148,10 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
                     }
                     HTableInterface table = null;
                     try {
-                        if (allowLocalUpdates && env != null) {
+                        if (allowLocalUpdates
+                                && env != null
+                                && tableReference.getTableName().equals(
+                                    env.getRegion().getTableDesc().getNameAsString())) {
                             try {
                                 throwFailureIfDone();
                                 IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true);


Mime
View raw message