Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0CDAD200BC6 for ; Sun, 20 Nov 2016 16:12:29 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 009BC160B07; Sun, 20 Nov 2016 15:12:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F198F160AF1 for ; Sun, 20 Nov 2016 16:12:27 +0100 (CET) Received: (qmail 97021 invoked by uid 500); 20 Nov 2016 15:12:27 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 97012 invoked by uid 99); 20 Nov 2016 15:12:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 20 Nov 2016 15:12:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ACF7EE0BDB; Sun, 20 Nov 2016 15:12:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tedyu@apache.org To: commits@hbase.apache.org Message-Id: <6f3b2dd190454a90bf02003a4b5f581f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-17131 Avoid livelock caused by HRegion#processRowsWithLocks (ChiaPing Tsai) Date: Sun, 20 Nov 2016 15:12:26 +0000 (UTC) archived-at: Sun, 20 Nov 2016 15:12:29 -0000 Repository: hbase Updated Branches: refs/heads/master ec9c9e201 -> bb645bcfd HBASE-17131 Avoid livelock caused by HRegion#processRowsWithLocks (ChiaPing Tsai) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bb645bcf Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bb645bcf Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bb645bcf Branch: refs/heads/master Commit: bb645bcfda74ad1f96b16e6e47543d44fbca5a98 Parents: ec9c9e2 Author: tedyu Authored: Sun Nov 20 07:09:02 2016 -0800 Committer: tedyu Committed: Sun Nov 20 07:09:02 2016 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 26 ++-- .../hbase/client/TestFromClientSide3.java | 124 +++++++++++++++++-- 2 files changed, 129 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/bb645bcf/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4d35b51..c372faa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7044,8 +7044,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return; } - boolean locked; - List acquiredRowLocks; + boolean locked = false; + List acquiredRowLocks = null; List mutations = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); // This is assigned by mvcc either explicity in the below or in the guts of the WAL append @@ -7053,19 +7053,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WriteEntry writeEntry = null; MemstoreSize memstoreSize = new MemstoreSize(); try { - // STEP 2. Acquire the row lock(s) - acquiredRowLocks = new ArrayList(rowsToLock.size()); - for (byte[] row : rowsToLock) { - // Attempt to lock all involved rows, throw if any lock times out - // use a writer lock for mixed reads and writes - acquiredRowLocks.add(getRowLockInternal(row, false)); - } - // STEP 3. Region lock - lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); - locked = true; boolean success = false; - long now = EnvironmentEdgeManager.currentTime(); try { + // STEP 2. Acquire the row lock(s) + acquiredRowLocks = new ArrayList<>(rowsToLock.size()); + for (byte[] row : rowsToLock) { + // Attempt to lock all involved rows, throw if any lock times out + // use a writer lock for mixed reads and writes + acquiredRowLocks.add(getRowLockInternal(row, false)); + } + // STEP 3. Region lock + lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size()); + locked = true; + long now = EnvironmentEdgeManager.currentTime(); // STEP 4. Let the processor scan the rows, generate mutations and add waledits doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/bb645bcf/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 82fbe77..cbc97a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -23,13 +23,17 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -43,10 +47,17 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; @@ -54,6 +65,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.AfterClass; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -494,7 +506,7 @@ public class TestFromClientSide3 { } @Test(timeout = 60000) - public void testPutWithPreBatchMutate ()throws Exception { + public void testPutWithPreBatchMutate() throws Exception { TableName tableName = TableName.valueOf("testPutWithPreBatchMutate"); testPreBatchMutate(tableName, () -> { try { @@ -509,7 +521,7 @@ public class TestFromClientSide3 { } @Test(timeout = 60000) - public void testRowMutationsWithPreBatchMutate ()throws Exception { + public void testRowMutationsWithPreBatchMutate() throws Exception { TableName tableName = TableName.valueOf("testRowMutationsWithPreBatchMutate"); testPreBatchMutate(tableName, () -> { try { @@ -525,7 +537,7 @@ public class TestFromClientSide3 { }); } - private void testPreBatchMutate (TableName tableName, Runnable rn)throws Exception { + private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception { HTableDescriptor desc = new HTableDescriptor(tableName); desc.addCoprocessor(WatiingForScanObserver.class.getName()); desc.addFamily(new HColumnDescriptor(FAMILY)); @@ -555,22 +567,118 @@ public class TestFromClientSide3 { TEST_UTIL.deleteTable(tableName); } - private static T find(final TableName tableName, - Class clz) throws IOException, InterruptedException { + @Test(timeout = 30000) + public void testMultiRowMutations() throws Exception, Throwable { + TableName tableName = TableName.valueOf("testMultiRowMutations"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addCoprocessor(MultiRowMutationEndpoint.class.getName()); + desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName()); + desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); + desc.addFamily(new HColumnDescriptor(FAMILY)); + TEST_UTIL.getAdmin().createTable(desc); + // new a connection for lower retry number. + Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); + copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + try (Connection con = ConnectionFactory.createConnection(copy)) { + byte[] row = Bytes.toBytes("ROW-0"); + byte[] rowLocked= Bytes.toBytes("ROW-1"); + byte[] value0 = Bytes.toBytes("VALUE-0"); + byte[] value1 = Bytes.toBytes("VALUE-1"); + byte[] value2 = Bytes.toBytes("VALUE-2"); + assertNoLocks(tableName); + ExecutorService putService = Executors.newSingleThreadExecutor(); + putService.execute(() -> { + try (Table table = con.getTable(tableName)) { + Put put0 = new Put(rowLocked); + put0.addColumn(FAMILY, QUALIFIER, value0); + // the put will be blocked by WatiingForMultiMutationsObserver. + table.put(put0); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + ExecutorService cpService = Executors.newSingleThreadExecutor(); + cpService.execute(() -> { + Put put1 = new Put(row); + Put put2 = new Put(rowLocked); + put1.addColumn(FAMILY, QUALIFIER, value1); + put2.addColumn(FAMILY, QUALIFIER, value2); + try (Table table = con.getTable(tableName)) { + MultiRowMutationProtos.MutateRowsRequest request + = MultiRowMutationProtos.MutateRowsRequest.newBuilder() + .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put1)) + .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put2)) + .build(); + table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, + ROW, ROW, + (MultiRowMutationProtos.MultiRowMutationService exe) -> { + ServerRpcController controller = new ServerRpcController(); + CoprocessorRpcUtils.BlockingRpcCallback + rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); + exe.mutateRows(controller, request, rpcCallback); + return rpcCallback.get(); + }); + fail("This cp should fail because the target lock is blocked by previous put"); + } catch (Throwable ex) { + } + }); + cpService.shutdown(); + cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class); + observer.latch.countDown(); + putService.shutdown(); + putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + try (Table table = con.getTable(tableName)) { + Get g0 = new Get(row); + Get g1 = new Get(rowLocked); + Result r0 = table.get(g0); + Result r1 = table.get(g1); + assertTrue(r0.isEmpty()); + assertFalse(r1.isEmpty()); + assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); + } + assertNoLocks(tableName); + } + } + + private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException { + HRegion region = (HRegion) find(tableName); + assertEquals(0, region.getLockedRows().size()); + } + private static Region find(final TableName tableName) + throws IOException, InterruptedException { HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); List regions = rs.getOnlineRegions(tableName); assertEquals(1, regions.size()); - Region region = regions.get(0); + return regions.get(0); + } + + private static T find(final TableName tableName, + Class clz) throws IOException, InterruptedException { + Region region = find(tableName); Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); assertTrue("The cp instance should be " + clz.getName() + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp)); return clz.cast(cp); } - public static class WatiingForScanObserver extends BaseRegionObserver { + public static class WatiingForMultiMutationsObserver extends BaseRegionObserver { + final CountDownLatch latch = new CountDownLatch(1); + @Override + public void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress miniBatchOp) throws IOException { + try { + latch.await(); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + } + public static class WatiingForScanObserver extends BaseRegionObserver { private final CountDownLatch latch = new CountDownLatch(1); - @Override public void postBatchMutate(final ObserverContext c, final MiniBatchOperationInProgress miniBatchOp) throws IOException {