From commits-return-64446-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Thu Jan 4 09:19:41 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 35EB8180657 for ; Thu, 4 Jan 2018 09:19:41 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2648B160C44; Thu, 4 Jan 2018 08:19:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1C0EB160C42 for ; Thu, 4 Jan 2018 09:19:39 +0100 (CET) Received: (qmail 27835 invoked by uid 500); 4 Jan 2018 08:19:37 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 27709 invoked by uid 99); 4 Jan 2018 08:19:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Jan 2018 08:19:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D3AA6F1826; Thu, 4 Jan 2018 08:19:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Thu, 04 Jan 2018 08:19:52 -0000 Message-Id: In-Reply-To: <86c0613f44104a809c4413e1bed741fe@git.apache.org> References: <86c0613f44104a809c4413e1bed741fe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/38] hbase git commit: HBASE-19520 Add UTs for the new lock type PEER HBASE-19520 Add UTs for the new lock type PEER Signed-off-by: zhangduo Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8f6dc608 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8f6dc608 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8f6dc608 Branch: refs/heads/HBASE-19397 Commit: 8f6dc6087689849bbc81fd15896e69cd144f4a8a Parents: 5d32c8c Author: Guanghao Zhang Authored: Wed Dec 20 16:43:38 2017 +0800 Committer: zhangduo Committed: Thu Jan 4 14:50:49 2018 +0800 ---------------------------------------------------------------------- .../procedure/MasterProcedureScheduler.java | 9 +- .../procedure/TestMasterProcedureScheduler.java | 65 ++++++++- ...TestMasterProcedureSchedulerConcurrency.java | 135 +++++++++++++++++++ 3 files changed, 201 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8f6dc608/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 8ff2d12..a25217c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -389,6 +389,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { while (tableIter.hasNext()) { count += tableIter.next().size(); } + + // Peer queues + final AvlTreeIterator peerIter = new AvlTreeIterator<>(peerMap); + while (peerIter.hasNext()) { + count += peerIter.next().size(); + } + return count; } @@ -1041,7 +1048,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @see #wakePeerExclusiveLock(Procedure, String) * @param procedure the procedure trying to acquire the lock * @param peerId peer to lock - * @return true if the procedure has to wait for the per to be available + * @return true if the procedure has to wait for the peer to be available */ public boolean waitPeerExclusiveLock(Procedure procedure, String peerId) { schedLock(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8f6dc608/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 0291165..fd77e1f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -905,6 +905,27 @@ public class TestMasterProcedureScheduler { } } + public static class TestPeerProcedure extends TestProcedure implements PeerProcedureInterface { + private final String peerId; + private final PeerOperationType opType; + + public TestPeerProcedure(long procId, String peerId, PeerOperationType opType) { + super(procId); + this.peerId = peerId; + this.opType = opType; + } + + @Override + public String getPeerId() { + return peerId; + } + + @Override + public PeerOperationType getPeerOperationType() { + return opType; + } + } + private static LockProcedure createLockProcedure(LockType lockType, long procId) throws Exception { LockProcedure procedure = new LockProcedure(); @@ -927,22 +948,19 @@ public class TestMasterProcedureScheduler { return createLockProcedure(LockType.SHARED, procId); } - private static void assertLockResource(LockedResource resource, - LockedResourceType resourceType, String resourceName) - { + private static void assertLockResource(LockedResource resource, LockedResourceType resourceType, + String resourceName) { assertEquals(resourceType, resource.getResourceType()); assertEquals(resourceName, resource.getResourceName()); } - private static void assertExclusiveLock(LockedResource resource, Procedure procedure) - { + private static void assertExclusiveLock(LockedResource resource, Procedure procedure) { assertEquals(LockType.EXCLUSIVE, resource.getLockType()); assertEquals(procedure, resource.getExclusiveLockOwnerProcedure()); assertEquals(0, resource.getSharedLockCount()); } - private static void assertSharedLock(LockedResource resource, int lockCount) - { + private static void assertSharedLock(LockedResource resource, int lockCount) { assertEquals(LockType.SHARED, resource.getLockType()); assertEquals(lockCount, resource.getSharedLockCount()); } @@ -1027,6 +1045,39 @@ public class TestMasterProcedureScheduler { } @Test + public void testListLocksPeer() throws Exception { + String peerId = "1"; + LockProcedure procedure = createExclusiveLockProcedure(4); + queue.waitPeerExclusiveLock(procedure, peerId); + + List locks = queue.getLocks(); + assertEquals(1, locks.size()); + + LockedResource resource = locks.get(0); + assertLockResource(resource, LockedResourceType.PEER, peerId); + assertExclusiveLock(resource, procedure); + assertTrue(resource.getWaitingProcedures().isEmpty()); + + // Try to acquire the exclusive lock again with same procedure + assertFalse(queue.waitPeerExclusiveLock(procedure, peerId)); + + // Try to acquire the exclusive lock again with new procedure + LockProcedure procedure2 = createExclusiveLockProcedure(5); + assertTrue(queue.waitPeerExclusiveLock(procedure2, peerId)); + + // Same peerId, still only has 1 LockedResource + locks = queue.getLocks(); + assertEquals(1, locks.size()); + + resource = locks.get(0); + assertLockResource(resource, LockedResourceType.PEER, peerId); + // LockedResource owner still is the origin procedure + assertExclusiveLock(resource, procedure); + // The new procedure should in the waiting list + assertEquals(1, resource.getWaitingProcedures().size()); + } + + @Test public void testListLocksWaiting() throws Exception { LockProcedure procedure1 = createExclusiveLockProcedure(1); queue.waitTableExclusiveLock(procedure1, TableName.valueOf("ns4", "table4")); http://git-wip-us.apache.org/repos/asf/hbase/blob/8f6dc608/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java index 2e8e52a..4e67a63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java @@ -26,6 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; +import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure; import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -63,6 +65,85 @@ public class TestMasterProcedureSchedulerConcurrency { queue.clear(); } + @Test + public void testConcurrentPeerOperations() throws Exception { + TestPeerProcedureSet procSet = new TestPeerProcedureSet(queue); + + int NUM_ITEMS = 10; + int NUM_PEERS = 5; + AtomicInteger opsCount = new AtomicInteger(0); + for (int i = 0; i < NUM_PEERS; ++i) { + String peerId = String.format("test-peer-%04d", i); + for (int j = 1; j < NUM_ITEMS; ++j) { + procSet.addBack(new TestPeerProcedure(i * 100 + j, peerId, PeerOperationType.ADD)); + opsCount.incrementAndGet(); + } + } + assertEquals(opsCount.get(), queue.size()); + + Thread[] threads = new Thread[NUM_PEERS * 2]; + HashSet concurrentPeers = new HashSet<>(); + ArrayList failures = new ArrayList<>(); + AtomicInteger concurrentCount = new AtomicInteger(0); + for (int i = 0; i < threads.length; ++i) { + threads[i] = new Thread() { + @Override + public void run() { + while (opsCount.get() > 0) { + try { + TestPeerProcedure proc = procSet.acquire(); + if (proc == null) { + queue.signalAll(); + if (opsCount.get() > 0) { + continue; + } + break; + } + + String peerId = proc.getPeerId(); + synchronized (concurrentPeers) { + assertTrue("unexpected concurrency on " + peerId, concurrentPeers.add(peerId)); + } + assertTrue(opsCount.decrementAndGet() >= 0); + + try { + long procId = proc.getProcId(); + int concurrent = concurrentCount.incrementAndGet(); + assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_PEERS, + concurrent >= 1 && concurrent <= NUM_PEERS); + LOG.debug("[S] peerId="+ peerId +" procId="+ procId +" concurrent="+ concurrent); + Thread.sleep(2000); + concurrent = concurrentCount.decrementAndGet(); + LOG.debug("[E] peerId="+ peerId +" procId="+ procId +" concurrent="+ concurrent); + assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_PEERS); + } finally { + synchronized (concurrentPeers) { + assertTrue(concurrentPeers.remove(peerId)); + } + procSet.release(proc); + } + } catch (Throwable e) { + LOG.error("Failed " + e.getMessage(), e); + synchronized (failures) { + failures.add(e.getMessage()); + } + } finally { + queue.signalAll(); + } + } + } + }; + threads[i].start(); + } + + for (int i = 0; i < threads.length; ++i) { + threads[i].join(); + } + assertTrue(failures.toString(), failures.isEmpty()); + assertEquals(0, opsCount.get()); + assertEquals(0, queue.size()); + } + /** * Verify that "write" operations for a single table are serialized, * but different tables can be executed in parallel. @@ -221,4 +302,58 @@ public class TestMasterProcedureSchedulerConcurrency { return ((TableProcedureInterface)proc).getTableOperationType(); } } + + public static class TestPeerProcedureSet { + private final MasterProcedureScheduler queue; + + public TestPeerProcedureSet(final MasterProcedureScheduler queue) { + this.queue = queue; + } + + public void addBack(TestPeerProcedure proc) { + queue.addBack(proc); + } + + public TestPeerProcedure acquire() { + TestPeerProcedure proc = null; + boolean waiting = true; + while (waiting && queue.size() > 0) { + proc = (TestPeerProcedure) queue.poll(100000000L); + if (proc == null) { + continue; + } + switch (proc.getPeerOperationType()) { + case ADD: + case REMOVE: + case ENABLE: + case DISABLE: + case UPDATE_CONFIG: + waiting = queue.waitPeerExclusiveLock(proc, proc.getPeerId()); + break; + case REFRESH: + waiting = false; + break; + default: + throw new UnsupportedOperationException(); + } + } + return proc; + } + + public void release(TestPeerProcedure proc) { + switch (proc.getPeerOperationType()) { + case ADD: + case REMOVE: + case ENABLE: + case DISABLE: + case UPDATE_CONFIG: + queue.wakePeerExclusiveLock(proc, proc.getPeerId()); + break; + case REFRESH: + break; + default: + throw new UnsupportedOperationException(); + } + } + } }