Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DF724200D47 for ; Sat, 11 Nov 2017 01:06:45 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DDEF0160BF2; Sat, 11 Nov 2017 00:06:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D60CF160BEE for ; Sat, 11 Nov 2017 01:06:44 +0100 (CET) Received: (qmail 31998 invoked by uid 500); 11 Nov 2017 00:06:44 -0000 Mailing-List: contact notifications-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list notifications@asterixdb.apache.org Received: (qmail 31989 invoked by uid 99); 11 Nov 2017 00:06:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 11 Nov 2017 00:06:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6250D1A4AA0 for ; Sat, 11 Nov 2017 00:06:43 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.072 X-Spam-Level: **** X-Spam-Status: No, score=4.072 tagged_above=-999 required=6.31 tests=[MISSING_HEADERS=1.207, REPLYTO_WITHOUT_TO_CC=1.946, SPF_FAIL=0.919] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id tg8KzE039jxZ for ; Sat, 11 Nov 2017 00:06:41 +0000 (UTC) Received: from vitalstatistix.ics.uci.edu (vitalstatistix.ics.uci.edu [128.195.52.38]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 7156A5FCE9 for ; Sat, 11 Nov 2017 00:06:40 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by vitalstatistix.ics.uci.edu (Postfix) with ESMTP id AA7DF10078A; Fri, 10 Nov 2017 16:06:39 -0800 (PST) Date: Fri, 10 Nov 2017 16:06:39 -0800 From: "abdullah alamoudi (Code Review)" Message-ID: Reply-To: bamousaa@gmail.com X-Gerrit-MessageType: newchange Subject: Change in asterixdb[release-0.9.3-pre-rc]: [NO ISSUE][TX] Fix DatasetLock for Multiple Index Builds X-Gerrit-Change-Id: I3bea3ff2075d952ab13402b0c445c464b431c0f5 X-Gerrit-ChangeURL: X-Gerrit-Commit: a377824ad054ff0270f0b698d3f8e99a17508aaf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Content-Disposition: inline User-Agent: Gerrit/2.12.7 archived-at: Sat, 11 Nov 2017 00:06:46 -0000 abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2144 Change subject: [NO ISSUE][TX] Fix DatasetLock for Multiple Index Builds ...................................................................... [NO ISSUE][TX] Fix DatasetLock for Multiple Index Builds The mechansim used for allowing multiple concurrent index builds does not work if the first index builds finishes before other index builds. It relied on a write lock obtained by the first index builder and released by the last index builder. This is not allowed when using ReentrantReadWriteLock and will lead to an IllegalMonitorStateException since the last thread to exit did not hold the lock. Change-Id: I3bea3ff2075d952ab13402b0c445c464b431c0f5 --- M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java A asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java 2 files changed, 243 insertions(+), 24 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/44/2144/1 diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java index 31f2089..b8aabd8 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java @@ -30,17 +30,24 @@ public class DatasetLock implements IMetadataLock { private final String key; + // The lock private final ReentrantReadWriteLock lock; - private final ReentrantReadWriteLock dsReadLock; - private final ReentrantReadWriteLock dsModifyLock; + // Used for lock upgrade operation + private final ReentrantReadWriteLock upgradeLock; + // Used for exclusive modification + private final ReentrantReadWriteLock modifyLock; + // The two counters below are used to ensure mutual exclusivity between index builds and modifications + // order of entry indexBuildCounter -> indexModifyCounter private final MutableInt indexBuildCounter; + private final MutableInt dsModifyCounter; public DatasetLock(String key) { this.key = key; lock = new ReentrantReadWriteLock(true); - dsReadLock = new ReentrantReadWriteLock(true); - dsModifyLock = new ReentrantReadWriteLock(true); + upgradeLock = new ReentrantReadWriteLock(true); + modifyLock = new ReentrantReadWriteLock(true); indexBuildCounter = new MutableInt(0); + dsModifyCounter = new MutableInt(0); } private void readLock() { @@ -72,62 +79,110 @@ } private void readReadLock() { - dsReadLock.readLock().lock(); + upgradeLock.readLock().lock(); } private void modifyReadLock() { // insert - dsModifyLock.readLock().lock(); + modifyLock.readLock().lock(); + incrementModifyCounter(); + } + + private void incrementModifyCounter() { + synchronized (indexBuildCounter) { + boolean interrupted = false; + while (indexBuildCounter.getValue() > 0) { + try { + indexBuildCounter.wait(); + } catch (InterruptedException e) { + interrupted = true; + } + } + synchronized (dsModifyCounter) { + dsModifyCounter.increment(); + } + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + private void decrementModifyCounter() { + synchronized (indexBuildCounter) { + synchronized (dsModifyCounter) { + dsModifyCounter.decrement(); + } + indexBuildCounter.notifyAll(); + } } private void modifyReadUnlock() { // insert - dsModifyLock.readLock().unlock(); + decrementModifyCounter(); + modifyLock.readLock().unlock(); } private void readReadUnlock() { - dsReadLock.readLock().unlock(); + upgradeLock.readLock().unlock(); } private void readWriteUnlock() { - dsReadLock.writeLock().unlock(); + upgradeLock.writeLock().unlock(); } - private void modifySharedWriteLock() { + private void BuildIndexLock() { // Build index statement synchronized (indexBuildCounter) { if (indexBuildCounter.getValue() > 0) { indexBuildCounter.setValue(indexBuildCounter.getValue() + 1); } else { - dsModifyLock.writeLock().lock(); - indexBuildCounter.setValue(1); + boolean locked = false; + boolean interrupted = false; + while (!locked) { + synchronized (dsModifyCounter) { + if (dsModifyCounter.getValue() == 0) { + indexBuildCounter.setValue(indexBuildCounter.getValue() + 1); + locked = true; + } + } + if (!locked) { + try { + indexBuildCounter.wait(); + } catch (InterruptedException e) { + interrupted = true; + } + } + } + if (interrupted) { + Thread.currentThread().interrupt(); + } } } } - private void modifySharedWriteUnlock() { + private void BuildIndexUnlock() { // Build index statement synchronized (indexBuildCounter) { - if (indexBuildCounter.getValue() == 1) { - dsModifyLock.writeLock().unlock(); - } indexBuildCounter.setValue(indexBuildCounter.getValue() - 1); + indexBuildCounter.notifyAll(); } } private void modifyExclusiveWriteLock() { - dsModifyLock.writeLock().lock(); + modifyLock.writeLock().lock(); + incrementModifyCounter(); } private void modifyExclusiveWriteUnlock() { - dsModifyLock.writeLock().unlock(); + decrementModifyCounter(); + modifyLock.writeLock().unlock(); } @Override public void upgrade(IMetadataLock.Mode from, IMetadataLock.Mode to) throws AlgebricksException { if (from == IMetadataLock.Mode.EXCLUSIVE_MODIFY && to == IMetadataLock.Mode.UPGRADED_WRITE) { - dsReadLock.readLock().unlock(); - dsReadLock.writeLock().lock(); + upgradeLock.readLock().unlock(); + upgradeLock.writeLock().lock(); } else { throw new MetadataException(ErrorCode.ILLEGAL_LOCK_UPGRADE_OPERATION, from, to); } @@ -136,8 +191,8 @@ @Override public void downgrade(IMetadataLock.Mode from, IMetadataLock.Mode to) throws AlgebricksException { if (from == IMetadataLock.Mode.UPGRADED_WRITE && to == IMetadataLock.Mode.EXCLUSIVE_MODIFY) { - dsReadLock.writeLock().unlock(); - dsReadLock.readLock().lock(); + upgradeLock.writeLock().unlock(); + upgradeLock.readLock().lock(); } else { throw new MetadataException(ErrorCode.ILLEGAL_LOCK_DOWNGRADE_OPERATION, from, to); } @@ -148,7 +203,7 @@ switch (mode) { case INDEX_BUILD: readLock(); - modifySharedWriteLock(); + BuildIndexLock(); break; case MODIFY: readLock(); @@ -176,7 +231,7 @@ public void unlock(IMetadataLock.Mode mode) { switch (mode) { case INDEX_BUILD: - modifySharedWriteUnlock(); + BuildIndexUnlock(); readUnlock(); break; case MODIFY: diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java new file mode 100644 index 0000000..ec69912 --- /dev/null +++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java @@ -0,0 +1,164 @@ +package org.apache.asterix.metadata.lock; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Semaphore; + +import org.apache.asterix.common.metadata.LockList; +import org.apache.hyracks.api.util.SingleThreadEventProcessor; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class MetadataLockManagerTest { + + static final int REPREAT_TEST_COUNT = 10; + + @Parameterized.Parameters + public static List data() { + return Arrays.asList(new Object[REPREAT_TEST_COUNT][0]); + } + + private static class Request { + private enum Statement { + INDEX, + MODIFY + } + + private final Statement statement; + private final String dataset; + private boolean done; + private int step = 0; + + public Request(Statement statement, String dataset) { + this.statement = statement; + this.dataset = dataset; + done = false; + } + + Statement statement() { + return statement; + } + + String dataset() { + return dataset; + } + + synchronized void complete() { + done = true; + notifyAll(); + } + + synchronized void await() throws InterruptedException { + while (!done) { + wait(); + } + } + + synchronized void step() { + step++; + notifyAll(); + } + + synchronized int getSteps() { + return step; + } + + synchronized void await(int step) throws InterruptedException { + while (this.step < step) { + wait(); + } + } + } + + public class User extends SingleThreadEventProcessor { + + private MetadataLockManager lockManager; + private Semaphore step = new Semaphore(0); + private final LockList locks = new LockList(); + + public User(String username, MetadataLockManager lockManager) { + super(username); + this.lockManager = lockManager; + } + + public void step() { + step.release(); + } + + @Override + protected void handle(Request req) throws Exception { + try { + step.acquire(); + switch (req.statement()) { + case INDEX: + lockManager.acquireDatasetCreateIndexLock(locks, req.dataset()); + break; + case MODIFY: + lockManager.acquireDatasetModifyLock(locks, req.dataset()); + break; + default: + break; + } + req.step(); + step.acquire(); + } finally { + locks.reset(); + req.step(); + req.complete(); + } + } + + } + + @Test + public void testDatasetLockMultipleIndexBuilds() throws Exception { + MetadataLockManager lockManager = new MetadataLockManager(); + String dataset = "Dataset"; + User[] users = new User[3]; + Request[] requests = new Request[3]; + users[0] = new User("till", lockManager); + requests[0] = new Request(Request.Statement.INDEX, dataset); + users[1] = new User("dmitry", lockManager); + requests[1] = new Request(Request.Statement.INDEX, dataset); + users[2] = new User("mike", lockManager); + requests[2] = new Request(Request.Statement.MODIFY, dataset); + // Till builds an index + users[0].add(requests[0]); + // Dmitry builds an index + users[1].add(requests[1]); + // Mike Modifies + users[2].add(requests[2]); + // Till starts + users[0].step(); + // Ensure lock acquired + requests[0].await(1); + // Dmitry starts + users[1].step(); + // Ensure lock acquired + requests[1].await(1); + // Mike starts and is allowed to go all the way + users[2].step(); + users[2].step(); + // Ensure that Mike still could not acquire locks + Assert.assertEquals(0, requests[2].getSteps()); + // Till finishes first + users[0].step(); + // Ensure the request has been completed and lock has been released + requests[0].await(); + // Ensure that Mike still could not acquire locks + Assert.assertEquals(0, requests[2].getSteps()); + // Dmitry finishes second + users[1].step(); + // Ensure the request has been completed and lock has been released + requests[1].await(); + // Ensure that Mike could proceed and request has been completed + requests[2].await(); + // Stop users + for (int i = 0; i < users.length; i++) { + users[i].stop(); + } + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/2144 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I3bea3ff2075d952ab13402b0c445c464b431c0f5 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: release-0.9.3-pre-rc Gerrit-Owner: abdullah alamoudi