Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DA57D200D4B for ; Mon, 13 Nov 2017 07:20:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D8CAC160C05; Mon, 13 Nov 2017 06:20:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A8DC8160BF1 for ; Mon, 13 Nov 2017 07:20:52 +0100 (CET) Received: (qmail 20338 invoked by uid 500); 13 Nov 2017 06:20:51 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 20329 invoked by uid 99); 13 Nov 2017 06:20:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Nov 2017 06:20:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AA4BCDFBCA; Mon, 13 Nov 2017 06:20:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: asterixdb git commit: [NO ISSUE][TX] Fix DatasetLock for Multiple Index Builds Date: Mon, 13 Nov 2017 06:20:51 +0000 (UTC) archived-at: Mon, 13 Nov 2017 06:20:54 -0000 Repository: asterixdb Updated Branches: refs/heads/release-0.9.3-pre-rc ed4b04680 -> 19a2b58c3 [NO ISSUE][TX] Fix DatasetLock for Multiple Index Builds - user model changes: no - storage format changes: no - interface changes: no details: - The mechanism used for allowing multiple concurrent index builds does not work if the first index build finishes before other index builds. It relied on a write lock obtained by the first index builder and released by the last index builder. This is not allowed when using ReentrantReadWriteLock and will lead to an IllegalMonitorStateException since the last thread to exit did not hold the lock. - Potential Deadlock between modify and exclusive modify can happen when an exclusive modify attempts to upgrade the lock to write lock while the modify lock waits for the exclusive lock to release. This has been fixed too. - Test cases were added. Change-Id: I3bea3ff2075d952ab13402b0c445c464b431c0f5 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2144 Integration-Tests: Jenkins Tested-by: Jenkins Contrib: Jenkins Reviewed-by: Michael Blow Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/19a2b58c Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/19a2b58c Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/19a2b58c Branch: refs/heads/release-0.9.3-pre-rc Commit: 19a2b58c32d56060a8167a2ad69bb942083ba4c1 Parents: ed4b046 Author: Abdullah Alamoudi Authored: Sun Nov 12 17:21:34 2017 -0800 Committer: abdullah alamoudi Committed: Sun Nov 12 22:19:43 2017 -0800 ---------------------------------------------------------------------- .../apache/asterix/app/active/RecoveryTask.java | 5 +- .../asterix/metadata/lock/DatasetLock.java | 111 ++++--- .../metadata/lock/MetadataLockManagerTest.java | 307 +++++++++++++++++++ 3 files changed, 384 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/19a2b58c/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java index 29b38ba..0321ae4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java @@ -134,8 +134,9 @@ public class RecoveryTask { lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName()); for (Dataset dataset : listener.getDatasets()) { - MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), - dataset.getDataverseName(), DatasetUtil.getFullyQualifiedName(dataset)); + lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName()); + lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), + DatasetUtil.getFullyQualifiedName(dataset)); } synchronized (listener) { try { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/19a2b58c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java index 31f2089..bc302c2 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java @@ -24,23 +24,31 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.metadata.IMetadataLock; +import org.apache.asterix.common.utils.InterruptUtil; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; public class DatasetLock implements IMetadataLock { private final String key; + // The lock private final ReentrantReadWriteLock lock; - private final ReentrantReadWriteLock dsReadLock; - private final ReentrantReadWriteLock dsModifyLock; + // Used for lock upgrade operation + private final ReentrantReadWriteLock upgradeLock; + // Used for exclusive modification + private final ReentrantReadWriteLock modifyLock; + // The two counters below are used to ensure mutual exclusivity between index builds and modifications + // order of entry indexBuildCounter -> indexModifyCounter private final MutableInt indexBuildCounter; + private final MutableInt dsModifyCounter; public DatasetLock(String key) { this.key = key; lock = new ReentrantReadWriteLock(true); - dsReadLock = new ReentrantReadWriteLock(true); - dsModifyLock = new ReentrantReadWriteLock(true); + upgradeLock = new ReentrantReadWriteLock(true); + modifyLock = new ReentrantReadWriteLock(true); indexBuildCounter = new MutableInt(0); + dsModifyCounter = new MutableInt(0); } private void readLock() { @@ -71,63 +79,97 @@ public class DatasetLock implements IMetadataLock { lock.writeLock().unlock(); } - private void readReadLock() { - dsReadLock.readLock().lock(); + private void upgradeReadLock() { + upgradeLock.readLock().lock(); } private void modifyReadLock() { // insert - dsModifyLock.readLock().lock(); + modifyLock.readLock().lock(); + incrementModifyCounter(); + } + + private void incrementModifyCounter() { + InterruptUtil.doUninterruptibly(() -> { + synchronized (indexBuildCounter) { + while (indexBuildCounter.getValue() > 0) { + indexBuildCounter.wait(); + } + synchronized (dsModifyCounter) { + dsModifyCounter.increment(); + } + } + }); + } + + private void decrementModifyCounter() { + synchronized (indexBuildCounter) { + synchronized (dsModifyCounter) { + if (dsModifyCounter.decrementAndGet() == 0) { + indexBuildCounter.notifyAll(); + } + } + } } private void modifyReadUnlock() { // insert - dsModifyLock.readLock().unlock(); + decrementModifyCounter(); + modifyLock.readLock().unlock(); } - private void readReadUnlock() { - dsReadLock.readLock().unlock(); + private void upgradeReadUnlock() { + upgradeLock.readLock().unlock(); } - private void readWriteUnlock() { - dsReadLock.writeLock().unlock(); + private void upgradeWriteUnlock() { + upgradeLock.writeLock().unlock(); } - private void modifySharedWriteLock() { + private void buildIndexLock() { // Build index statement synchronized (indexBuildCounter) { if (indexBuildCounter.getValue() > 0) { - indexBuildCounter.setValue(indexBuildCounter.getValue() + 1); + indexBuildCounter.increment(); } else { - dsModifyLock.writeLock().lock(); - indexBuildCounter.setValue(1); + InterruptUtil.doUninterruptibly(() -> { + while (true) { + synchronized (dsModifyCounter) { + if (dsModifyCounter.getValue() == 0) { + indexBuildCounter.increment(); + return; + } + } + indexBuildCounter.wait(); + } + }); } } } - private void modifySharedWriteUnlock() { + private void buildIndexUnlock() { // Build index statement synchronized (indexBuildCounter) { - if (indexBuildCounter.getValue() == 1) { - dsModifyLock.writeLock().unlock(); + if (indexBuildCounter.decrementAndGet() == 0) { + indexBuildCounter.notifyAll(); } - indexBuildCounter.setValue(indexBuildCounter.getValue() - 1); } } - private void modifyExclusiveWriteLock() { - dsModifyLock.writeLock().lock(); + private void modifyWriteLock() { + modifyLock.writeLock().lock(); + incrementModifyCounter(); } private void modifyExclusiveWriteUnlock() { - dsModifyLock.writeLock().unlock(); + decrementModifyCounter(); + modifyLock.writeLock().unlock(); } @Override public void upgrade(IMetadataLock.Mode from, IMetadataLock.Mode to) throws AlgebricksException { if (from == IMetadataLock.Mode.EXCLUSIVE_MODIFY && to == IMetadataLock.Mode.UPGRADED_WRITE) { - dsReadLock.readLock().unlock(); - dsReadLock.writeLock().lock(); + upgradeLock.writeLock().lock(); } else { throw new MetadataException(ErrorCode.ILLEGAL_LOCK_UPGRADE_OPERATION, from, to); } @@ -136,8 +178,7 @@ public class DatasetLock implements IMetadataLock { @Override public void downgrade(IMetadataLock.Mode from, IMetadataLock.Mode to) throws AlgebricksException { if (from == IMetadataLock.Mode.UPGRADED_WRITE && to == IMetadataLock.Mode.EXCLUSIVE_MODIFY) { - dsReadLock.writeLock().unlock(); - dsReadLock.readLock().lock(); + upgradeLock.writeLock().unlock(); } else { throw new MetadataException(ErrorCode.ILLEGAL_LOCK_DOWNGRADE_OPERATION, from, to); } @@ -148,24 +189,22 @@ public class DatasetLock implements IMetadataLock { switch (mode) { case INDEX_BUILD: readLock(); - modifySharedWriteLock(); + buildIndexLock(); break; case MODIFY: readLock(); - readReadLock(); modifyReadLock(); break; case EXCLUSIVE_MODIFY: readLock(); - readReadLock(); - modifyExclusiveWriteLock(); + modifyWriteLock(); break; case WRITE: writeLock(); break; case READ: readLock(); - readReadLock(); + upgradeReadLock(); break; default: throw new IllegalStateException("locking mode " + mode + " is not supported"); @@ -176,28 +215,26 @@ public class DatasetLock implements IMetadataLock { public void unlock(IMetadataLock.Mode mode) { switch (mode) { case INDEX_BUILD: - modifySharedWriteUnlock(); + buildIndexUnlock(); readUnlock(); break; case MODIFY: modifyReadUnlock(); - readReadUnlock(); readUnlock(); break; case EXCLUSIVE_MODIFY: modifyExclusiveWriteUnlock(); - readReadUnlock(); readUnlock(); break; case WRITE: writeUnlock(); break; case READ: - readReadUnlock(); + upgradeReadUnlock(); readUnlock(); break; case UPGRADED_WRITE: - readWriteUnlock(); + upgradeWriteUnlock(); modifyExclusiveWriteUnlock(); readUnlock(); break; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/19a2b58c/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java new file mode 100644 index 0000000..4382860 --- /dev/null +++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.lock; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Semaphore; + +import org.apache.asterix.common.metadata.LockList; +import org.apache.hyracks.api.util.SingleThreadEventProcessor; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class MetadataLockManagerTest { + + static final int REPREAT_TEST_COUNT = 3; + + @Parameterized.Parameters + public static List data() { + return Arrays.asList(new Object[REPREAT_TEST_COUNT][0]); + } + + private static class Request { + private enum Statement { + INDEX, + MODIFY, + EXCLUSIVE_MODIFY, + EXCLUSIVE_MODIFY_UPGRADE_DOWNGRADE, + EXCLUSIVE_MODIFY_UPGRADE, + } + + private final Statement statement; + private final String dataset; + private boolean done; + private int step = 0; + + public Request(Statement statement, String dataset) { + this.statement = statement; + this.dataset = dataset; + done = false; + } + + Statement statement() { + return statement; + } + + String dataset() { + return dataset; + } + + synchronized void complete() { + done = true; + notifyAll(); + } + + synchronized void await() throws InterruptedException { + while (!done) { + wait(); + } + } + + synchronized void step() { + step++; + notifyAll(); + } + + synchronized int getSteps() { + return step; + } + + synchronized void await(int step) throws InterruptedException { + while (this.step < step) { + wait(); + } + } + } + + public class User extends SingleThreadEventProcessor { + + private MetadataLockManager lockManager; + private Semaphore step = new Semaphore(0); + private final LockList locks = new LockList(); + + public User(String username, MetadataLockManager lockManager) { + super(username); + this.lockManager = lockManager; + } + + public void step() { + step.release(); + } + + @Override + protected void handle(Request req) throws Exception { + try { + step.acquire(); + switch (req.statement()) { + case INDEX: + lockManager.acquireDatasetCreateIndexLock(locks, req.dataset()); + break; + case MODIFY: + lockManager.acquireDatasetModifyLock(locks, req.dataset()); + break; + case EXCLUSIVE_MODIFY: + lockManager.acquireDatasetExclusiveModificationLock(locks, req.dataset()); + break; + case EXCLUSIVE_MODIFY_UPGRADE: + lockManager.acquireDatasetExclusiveModificationLock(locks, req.dataset()); + req.step(); + step.acquire(); + lockManager.upgradeDatasetLockToWrite(locks, req.dataset()); + break; + case EXCLUSIVE_MODIFY_UPGRADE_DOWNGRADE: + lockManager.acquireDatasetExclusiveModificationLock(locks, req.dataset()); + req.step(); + step.acquire(); + lockManager.upgradeDatasetLockToWrite(locks, req.dataset()); + req.step(); + step.acquire(); + lockManager.downgradeDatasetLockToExclusiveModify(locks, req.dataset()); + break; + default: + break; + } + req.step(); + step.acquire(); + } finally { + locks.reset(); + req.step(); + req.complete(); + } + } + + } + + @Test + public void testDatasetLockMultipleIndexBuildsSingleModifier() throws Exception { + MetadataLockManager lockManager = new MetadataLockManager(); + String dataset = "Dataset"; + User till = new User("till", lockManager); + Request tReq = new Request(Request.Statement.INDEX, dataset); + User dmitry = new User("dmitry", lockManager); + Request dReq = new Request(Request.Statement.INDEX, dataset); + User mike = new User("mike", lockManager); + Request mReq = new Request(Request.Statement.MODIFY, dataset); + // Till builds an index + till.add(tReq); + // Dmitry builds an index + dmitry.add(dReq); + // Mike modifies + mike.add(mReq); + // Till starts + till.step(); + // Ensure lock acquired + tReq.await(1); + // Dmitry starts + dmitry.step(); + // Ensure lock acquired + dReq.await(1); + // Mike starts and is allowed to go all the way + mike.step(); + mike.step(); + // Ensure that Mike still could not acquire locks + Assert.assertEquals(0, mReq.getSteps()); + // Till finishes first + till.step(); + // Ensure the request has been completed and lock has been released + tReq.await(); + // Ensure that Mike still could not acquire locks + Assert.assertEquals(0, mReq.getSteps()); + // Dmitry finishes second + dmitry.step(); + // Ensure the request has been completed and lock has been released + dReq.await(); + // Ensure that Mike could proceed and request has been completed + mReq.await(); + // Stop users + till.stop(); + dmitry.stop(); + mike.stop(); + } + + @Test + public void testDatasetLockMultipleModifiersSingleIndexBuilder() throws Exception { + MetadataLockManager lockManager = new MetadataLockManager(); + String dataset = "Dataset"; + User till = new User("till", lockManager); + Request tReq = new Request(Request.Statement.MODIFY, dataset); + User dmitry = new User("dmitry", lockManager); + Request dReq = new Request(Request.Statement.MODIFY, dataset); + User mike = new User("mike", lockManager); + Request mReq = new Request(Request.Statement.INDEX, dataset); + // Till modifies + till.add(tReq); + // Dmitry modifies + dmitry.add(dReq); + // Mike builds an index + mike.add(mReq); + // Till starts + till.step(); + // Ensure lock acquired + tReq.await(1); + // Dmitry starts + dmitry.step(); + // Ensure lock acquired + dReq.await(1); + // Mike starts and is allowed to go all the way + mike.step(); + mike.step(); + // Ensure that Mike still could not acquire locks + Assert.assertEquals(0, mReq.getSteps()); + // Till finishes first + till.step(); + // Ensure the request has been completed and lock has been released + tReq.await(); + // Ensure that Mike still could not acquire locks + Assert.assertEquals(0, mReq.getSteps()); + // Dmitry finishes second + dmitry.step(); + // Ensure the request has been completed and lock has been released + dReq.await(); + // Ensure that Mike could proceed and request has been completed + mReq.await(); + // Stop users + till.stop(); + dmitry.stop(); + mike.stop(); + } + + @Test + public void testDatasetLockMultipleModifiersSingleExclusiveModifier() throws Exception { + MetadataLockManager lockManager = new MetadataLockManager(); + String dataset = "Dataset"; + User till = new User("till", lockManager); + Request tReq = new Request(Request.Statement.MODIFY, dataset); + User dmitry = new User("dmitry", lockManager); + Request dReq = new Request(Request.Statement.MODIFY, dataset); + User mike = new User("mike", lockManager); + Request mReq = new Request(Request.Statement.EXCLUSIVE_MODIFY, dataset); + // Till starts + till.add(tReq); + till.step(); + // Ensure lock is acquired + tReq.await(1); + // Mike starts + mike.add(mReq); + mike.step(); + // Sleep for 1s for now as there is no way to find out user has submitted the exclusive lock request + Thread.sleep(1000); + // Ensure that Mike didn't get the lock + Assert.assertEquals(0, mReq.getSteps()); + // Dmitry starts + dmitry.add(dReq); + dmitry.step(); + // Ensure that Dmitry didn't get the lock + Assert.assertEquals(0, dReq.getSteps()); + // Till proceeds + till.step(); + // Ensure the request has been completed and lock has been released + tReq.await(); + // Ensure that Mike got the lock + mReq.await(1); + // Till submits another request + tReq = new Request(Request.Statement.MODIFY, dataset); + till.add(tReq); + till.step(); + // Ensure that Till didn't get the lock + Assert.assertEquals(0, tReq.getSteps()); + // Ensure that Dmitry didn't get the lock + Assert.assertEquals(0, dReq.getSteps()); + // Mike completes + mike.step(); + mReq.await(); + // Ensure that both Till and Dmitry got the lock + tReq.await(1); + dReq.await(1); + till.step(); + dmitry.step(); + // Ensure that both Till and Dmitry complete + tReq.await(); + dReq.await(); + // Stop users + till.stop(); + dmitry.stop(); + mike.stop(); + } + +}