phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [2/2] phoenix git commit: PHOENIX-4214 Scans which write should not block region split or close (Vincent Poon)
Date Fri, 29 Sep 2017 19:38:21 GMT
PHOENIX-4214 Scans which write should not block region split or close (Vincent Poon)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/519273b8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/519273b8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/519273b8

Branch: refs/heads/4.x-HBase-1.2
Commit: 519273b85dbc7f2d8d3fe71fd434bb8a396d3b04
Parents: 64ef10b
Author: James Taylor <jtaylor@salesforce.com>
Authored: Fri Sep 29 12:26:41 2017 -0700
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Fri Sep 29 12:36:06 2017 -0700

----------------------------------------------------------------------
 .../UpsertSelectOverlappingBatchesIT.java       | 245 +++++++++++++++----
 .../UngroupedAggregateRegionObserver.java       |  16 +-
 2 files changed, 210 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/519273b8/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
index 53346b9..dc9de81 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.execute;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
@@ -32,35 +33,59 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterIT {
-    
+    private static final Logger logger = LoggerFactory.getLogger(UpsertSelectOverlappingBatchesIT.class);
+    private Properties props;
+    private static volatile String dataTable;
+    private String index;
+
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
         serverProps.put("hbase.coprocessor.region.classes", SlowBatchRegionObserver.class.getName());
         serverProps.put("hbase.rowlock.wait.duration", "5000");
         serverProps.put(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "100");
-        Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1);
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()));
     }
-    
+
+    @AfterClass
+    public static void tearDownClass() throws Exception {
+        SlowBatchRegionObserver.SLOW_MUTATE = false;
+        getUtility().shutdownMiniCluster();
+    }
+
     private class UpsertSelectRunner implements Callable<Boolean> {
     	private final String dataTable;
     	private final int minIndex;
@@ -89,58 +114,186 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI
 				return true;
 			}
 		}
-    	
     }
-    
+
+    private static class UpsertSelectLooper implements Runnable {
+        private UpsertSelectRunner runner;
+        public UpsertSelectLooper(UpsertSelectRunner runner) {
+            this.runner = runner;
+        }
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    runner.call();
+                }
+                catch (Exception e) {
+                    if (ExceptionUtils.indexOfThrowable(e, InterruptedException.class) !=
-1) {
+                        logger.info("Interrupted, exiting", e);
+                        Thread.currentThread().interrupt();
+                        return;
+                    }
+                    logger.error("Hit exception while writing", e);
+                }
+            }
+        }};
+
+    @Before
+    public void setup() throws Exception {
+        SlowBatchRegionObserver.SLOW_MUTATE = false;
+        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+        dataTable = generateUniqueName();
+        index = "IDX_" + dataTable;
+        try (Connection conn = driver.connect(url, props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTable
+                    + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            // create the index and ensure its empty as well
+            conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable +
" (v1)");
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + "
VALUES(?,?,?)");
+            conn.setAutoCommit(false);
+            for (int i = 0; i < 100; i++) {
+                stmt.setInt(1, i);
+                stmt.setString(2, "v1" + i);
+                stmt.setString(3, "v2" + i);
+                stmt.execute();
+            }
+            conn.commit();
+        }
+    }
+
 	@Test
 	public void testUpsertSelectSameBatchConcurrently() throws Exception {
-		final String dataTable = generateUniqueName();
-		final String index = "IDX_" + dataTable;
-		// create the table and ensure its empty
-		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-		Connection conn = driver.connect(url, props);
-		conn.createStatement()
-				.execute("CREATE TABLE " + dataTable + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR,
v2 VARCHAR)");
-		// create the index and ensure its empty as well
-		conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
-
-		conn = DriverManager.getConnection(getUrl(), props);
-		PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
-		conn.setAutoCommit(false);
-		for (int i = 0; i < 100; i++) {
-			stmt.setInt(1, i);
-			stmt.setString(2, "v1" + i);
-			stmt.setString(3, "v2" + i);
-			stmt.execute();
-		}
-		conn.commit();
-
-		int numUpsertSelectRunners = 5;
-		ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
-		CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
-		List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
-		// run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
-		futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
-		// run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
-		for (int i = 0; i < 100; i += 25) {
-			futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
-		}
-		int received = 0;
-		while (received < futures.size()) {
-			Future<Boolean> resultFuture = completionService.take(); 
-			Boolean result = resultFuture.get();
-			received++;
-			assertTrue(result);
+		try (Connection conn = driver.connect(url, props)) {
+		        int numUpsertSelectRunners = 5;
+		        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+		        CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
+		        List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
+		        // run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
+		        futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105,
1)));
+		        // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
+		        for (int i = 0; i < 100; i += 25) {
+		            futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25,
5)));
+		        }
+		        int received = 0;
+		        while (received < futures.size()) {
+		            Future<Boolean> resultFuture = completionService.take();
+		            Boolean result = resultFuture.get();
+		            received++;
+		            assertTrue(result);
+		        }
+		        exec.shutdownNow();
 		}
-		exec.shutdownNow();
-		conn.close();
 	}
+
+    /**
+     * Tests that splitting a region is not blocked indefinitely by UPSERT SELECT load
+     */
+	@Test
+    public void testSplitDuringUpsertSelect() throws Exception {
+        int numUpsertSelectRunners = 4;
+        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+        try (Connection conn = driver.connect(url, props)) {
+            final UpsertSelectRunner upsertSelectRunner =
+                    new UpsertSelectRunner(dataTable, 0, 105, 1);
+            // keep running slow upsert selects
+            SlowBatchRegionObserver.SLOW_MUTATE = true;
+            for (int i = 0; i < numUpsertSelectRunners; i++) {
+                exec.submit(new UpsertSelectLooper(upsertSelectRunner));
+                Thread.sleep(300);
+            }
+
+            // keep trying to split the region
+            final HBaseTestingUtility utility = getUtility();
+            final HBaseAdmin admin = utility.getHBaseAdmin();
+            final TableName dataTN = TableName.valueOf(dataTable);
+            assertEquals(1, utility.getHBaseCluster().getRegions(dataTN).size());
+            utility.waitFor(60000L, 1000, new Waiter.Predicate<Exception>() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    try {
+                        List<HRegionInfo> regions = admin.getTableRegions(dataTN);
+                        if (regions.size() > 1) {
+                            logger.info("Found region was split");
+                            return true;
+                        }
+                        if (regions.size() == 0) {
+                            // This happens when region in transition or closed
+                            logger.info("No region returned");
+                            return false;
+                        }
+                        ;
+                        HRegionInfo hRegion = regions.get(0);
+                        logger.info("Attempting to split region");
+                        admin.splitRegion(hRegion.getRegionName(), Bytes.toBytes(2));
+                        return false;
+                    } catch (NotServingRegionException nsre) {
+                        // during split
+                        return false;
+                    }
+                }
+            });
+        } finally {
+            SlowBatchRegionObserver.SLOW_MUTATE = false;
+            exec.shutdownNow();
+            exec.awaitTermination(60, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Tests that UPSERT SELECT doesn't indefinitely block region closes
+     */
+    @Test
+    public void testRegionCloseDuringUpsertSelect() throws Exception {
+        int numUpsertSelectRunners = 4;
+        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+        try (Connection conn = driver.connect(url, props)) {
+            final UpsertSelectRunner upsertSelectRunner =
+                    new UpsertSelectRunner(dataTable, 0, 105, 1);
+            // keep running slow upsert selects
+            SlowBatchRegionObserver.SLOW_MUTATE = true;
+            for (int i = 0; i < numUpsertSelectRunners; i++) {
+                exec.submit(new UpsertSelectLooper(upsertSelectRunner));
+                Thread.sleep(300);
+            }
+
+            final HBaseTestingUtility utility = getUtility();
+            // try to close the region while UPSERT SELECTs are happening,
+            final HRegionServer dataRs = utility.getHBaseCluster().getRegionServer(0);
+            final HBaseAdmin admin = utility.getHBaseAdmin();
+            final HRegionInfo dataRegion =
+                    admin.getTableRegions(TableName.valueOf(dataTable)).get(0);
+            logger.info("Closing data table region");
+            admin.closeRegion(dataRs.getServerName(), dataRegion);
+            // make sure the region is offline
+            utility.waitFor(60000L, 1000, new Waiter.Predicate<Exception>() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    List<HRegionInfo> onlineRegions =
+                            admin.getOnlineRegions(dataRs.getServerName());
+                    for (HRegionInfo onlineRegion : onlineRegions) {
+                        if (onlineRegion.equals(dataRegion)) {
+                            logger.info("Data region still online");
+                            return false;
+                        }
+                    }
+                    logger.info("Region is no longer online");
+                    return true;
+                }
+            });
+        } finally {
+            SlowBatchRegionObserver.SLOW_MUTATE = false;
+            exec.shutdownNow();
+            exec.awaitTermination(60, TimeUnit.SECONDS);
+        }
+    }
     
     public static class SlowBatchRegionObserver extends SimpleRegionObserver {
+        public static volatile boolean SLOW_MUTATE = false;
         @Override
         public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
         	// model a slow batch that takes a long time
-            if (miniBatchOp.size()==100) {
+            if ((miniBatchOp.size()==100 || SLOW_MUTATE) && c.getEnvironment().getRegionInfo().getTable().getNameAsString().equals(dataTable))
{
             	try {
 					Thread.sleep(6000);
 				} catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/519273b8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 30f89cb..c3024a7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -164,7 +164,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     /**
      * This lock used for synchronizing the state of
      * {@link UngroupedAggregateRegionObserver#scansReferenceCount},
-     * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used to avoid possible
+     * {@link UngroupedAggregateRegionObserver#isRegionClosingOrSplitting} variables used
to avoid possible
      * dead lock situation in case below steps: 
      * 1. We get read lock when we start writing local indexes, deletes etc.. 
      * 2. when memstore reach threshold, flushes happen. Since they use read (shared) lock
they 
@@ -191,7 +191,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     @GuardedBy("lock")
     private int scansReferenceCount = 0;
     @GuardedBy("lock")
-    private boolean isRegionClosing = false;
+    private boolean isRegionClosingOrSplitting = false;
     private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
     private Configuration upsertSelectConfig;
@@ -285,7 +285,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
      */
     private void checkForRegionClosing() throws IOException {
         synchronized (lock) {
-            if(isRegionClosing) {
+            if(isRegionClosingOrSplitting) {
                 lock.notifyAll();
                 throw new IOException("Region is getting closed. Not allowing to write to
avoid possible deadlock.");
             }
@@ -499,10 +499,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             useIndexProto = false;
         }
         boolean acquiredLock = false;
+        boolean incrScanRefCount = false;
         try {
             if(needToWrite) {
                 synchronized (lock) {
+                    if (isRegionClosingOrSplitting) {
+                        throw new IOException("Temporarily unable to write from scan because
region is closing or splitting");
+                    }
                     scansReferenceCount++;
+                    incrScanRefCount = true;
                     lock.notifyAll();
                 }
             }
@@ -755,7 +760,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 }
             }
         } finally {
-            if (needToWrite) {
+            if (needToWrite && incrScanRefCount) {
                 synchronized (lock) {
                     scansReferenceCount--;
                     if (scansReferenceCount < 0) {
@@ -1295,6 +1300,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         // Don't allow splitting if operations need read and write to same region are going
on in the
         // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
         synchronized (lock) {
+            isRegionClosingOrSplitting = true;
             if (scansReferenceCount > 0) {
                 throw new IOException("Operations like local index building/delete/upsert
select"
                         + " might be going on so not allowing to split.");
@@ -1319,7 +1325,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
             throws IOException {
         synchronized (lock) {
-            isRegionClosing = true;
+            isRegionClosingOrSplitting = true;
             while (scansReferenceCount > 0) {
                 try {
                     lock.wait(1000);


Mime
View raw message