Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7848F1847C for ; Wed, 18 Nov 2015 22:37:14 +0000 (UTC) Received: (qmail 83017 invoked by uid 500); 18 Nov 2015 22:37:14 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 82979 invoked by uid 500); 18 Nov 2015 22:37:14 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 82969 invoked by uid 99); 18 Nov 2015 22:37:14 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Nov 2015 22:37:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id B46B6180185 for ; Wed, 18 Nov 2015 22:37:13 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.791 X-Spam-Level: * X-Spam-Status: No, score=1.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id ONgEDS3JitJL for ; Wed, 18 Nov 2015 22:37:01 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 99E9E20B89 for ; Wed, 18 Nov 2015 22:36:59 +0000 (UTC) Received: (qmail 81237 invoked by uid 99); 18 Nov 2015 22:36:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Nov 2015 22:36:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9D1D9DFC77; Wed, 18 Nov 2015 22:36:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tillw@apache.org To: commits@asterixdb.incubator.apache.org Message-Id: <290f6f1d0e81483196d0a1fb999f45bf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-asterixdb git commit: ASTERIXDB-1118: allow for lock conversion Date: Wed, 18 Nov 2015 22:36:58 +0000 (UTC) Repository: incubator-asterixdb Updated Branches: refs/heads/master 340332e20 -> f8daac233 ASTERIXDB-1118: allow for lock conversion Also improve debugability of ConcurrentLockManager and add new unit tests. Change-Id: If49ed8d48fa8c71a52c880d4f42a2badbe6a57d7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/474 Reviewed-by: Taewoo Kim Tested-by: Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/f8daac23 Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/f8daac23 Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/f8daac23 Branch: refs/heads/master Commit: f8daac2331ecda01bfb7fde67adb3dcc010add4d Parents: 340332e Author: Till Westmann Authored: Wed Nov 18 13:44:40 2015 -0800 Committer: Till Westmann Committed: Wed Nov 18 14:33:25 2015 -0800 ---------------------------------------------------------------------- asterix-transactions/pom.xml | 2 +- .../service/locking/ConcurrentLockManager.java | 400 +++++++------------ .../service/locking/DumpTablePrinter.java | 94 +++++ .../LockManagerDeterministicUnitTest.java | 2 - .../service/locking/ResourceGroup.java | 95 +++++ .../service/locking/ResourceGroupTable.java | 82 ++++ .../service/locking/ResourceTablePrinter.java | 113 ++++++ .../service/locking/TablePrinter.java | 24 ++ .../locking/WaitInterruptedException.java | 28 ++ .../transaction/TransactionSubsystem.java | 4 +- .../service/locking/LockManagerUnitTest.java | 398 ++++++++++++++++++ .../management/service/locking/Locker.java | 187 +++++++++ .../management/service/locking/Request.java | 166 ++++++++ 13 files changed, 1332 insertions(+), 263 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/pom.xml ---------------------------------------------------------------------- diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml index e5ad1b4..ae4aaa5 100644 --- a/asterix-transactions/pom.xml +++ b/asterix-transactions/pom.xml @@ -116,6 +116,6 @@ mockito-all 1.10.19 - + http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java index 3114195..af508f5 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java @@ -19,6 +19,14 @@ package org.apache.asterix.transaction.management.service.locking; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.DatasetId; +import org.apache.asterix.common.transactions.ILockManager; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode; +import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; + import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -31,30 +39,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.common.config.AsterixTransactionProperties; -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.transactions.DatasetId; -import org.apache.asterix.common.transactions.ILockManager; -import org.apache.asterix.common.transactions.ITransactionContext; -import org.apache.asterix.common.transactions.ITransactionManager; -import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode; -import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; -import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; - /** - * An implementation of the ILockManager interface. + * A concurrent implementation of the ILockManager interface. * - * @author tillw + * @see ResourceGroupTable + * @see ResourceGroup */ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent { - private static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName()); - private static final Level LVL = Level.FINER; + static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName()); + static final Level LVL = Level.FINER; public static final boolean DEBUG_MODE = false;//true public static final boolean CHECK_CONSISTENCY = false; - private TransactionSubsystem txnSubsystem; private ResourceGroupTable table; private ResourceArenaManager resArenaMgr; private RequestArenaManager reqArenaMgr; @@ -81,22 +79,21 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent static LockAction[][] ACTION_MATRIX = { // new NL IS IX S X - { LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL - { LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS - { LockAction.ERR, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT }, // IX - { LockAction.ERR, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT }, // S - { LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT } // X + {LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD}, // NL + {LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT}, // IS + {LockAction.ERR, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT}, // IX + {LockAction.ERR, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT}, // S + {LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT} // X }; - public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException { - this.txnSubsystem = txnSubsystem; - - this.table = new ResourceGroupTable(); - - final int lockManagerShrinkTimer = txnSubsystem.getTransactionProperties().getLockManagerShrinkTimer(); - - int noArenas = Runtime.getRuntime().availableProcessors() * 2; + public ConcurrentLockManager(final int lockManagerShrinkTimer) throws ACIDException { + this(lockManagerShrinkTimer, Runtime.getRuntime().availableProcessors() * 2, 1024); + // TODO increase table size? + } + public ConcurrentLockManager(final int lockManagerShrinkTimer, final int noArenas, final int tableSize) throws + ACIDException { + this.table = new ResourceGroupTable(tableSize); resArenaMgr = new ResourceArenaManager(noArenas, lockManagerShrinkTimer); reqArenaMgr = new RequestArenaManager(noArenas, lockManagerShrinkTimer); jobArenaMgr = new JobArenaManager(noArenas, lockManagerShrinkTimer); @@ -108,10 +105,6 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent }; } - public AsterixTransactionProperties getTransactionProperties() { - return this.txnSubsystem.getTransactionProperties(); - } - @Override public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext) throws ACIDException { @@ -142,6 +135,18 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent while (!locked) { final LockAction act = determineLockAction(resSlot, jobSlot, lockMode); switch (act) { + case CONV: + if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) { + DeadlockTracker tracker = new CollectingTracker(); + tracker.pushJob(jobSlot); + introducesDeadlock(resSlot, jobSlot, tracker); + requestAbort(txnContext, tracker.toString()); + break; + } else if (hasOtherHolders(resSlot, jobSlot)) { + enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext); + break; + } + //no break case UPD: resArenaMgr.setMaxMode(resSlot, lockMode); // no break @@ -150,7 +155,6 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent locked = true; break; case WAIT: - case CONV: enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext); break; case ERR: @@ -161,6 +165,8 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent if (entityHashValue == -1) { dsLockCache.get().put(jobId, dsId, lockMode); } + } catch (InterruptedException e) { + throw new WaitInterruptedException(txnContext, "interrupted", e); } finally { group.releaseLatch(); } @@ -170,7 +176,8 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent } private void enqueueWaiter(final ResourceGroup group, final long reqSlot, final long resSlot, final long jobSlot, - final LockAction act, ITransactionContext txnContext) throws ACIDException { + final LockAction act, ITransactionContext txnContext) throws ACIDException, + InterruptedException { final Queue queue = act.modify ? upgrader : waiter; if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) { DeadlockTracker tracker = new CollectingTracker(); @@ -214,6 +221,9 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent } static class CollectingTracker implements DeadlockTracker { + + static final boolean DEBUG = false; + ArrayList slots = new ArrayList(); ArrayList types = new ArrayList(); @@ -221,26 +231,26 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent public void pushResource(long resSlot) { types.add("Resource"); slots.add(resSlot); - System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1)); + if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1)); } @Override public void pushRequest(long reqSlot) { types.add("Request"); slots.add(reqSlot); - System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1)); + if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1)); } @Override public void pushJob(long jobSlot) { types.add("Job"); slots.add(jobSlot); - System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1)); + if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1)); } @Override public void pop() { - System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1)); + if (DEBUG) System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1)); types.remove(types.size() - 1); slots.remove(slots.size() - 1); } @@ -257,15 +267,19 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent /** * determine if adding a job to the waiters of a resource will introduce a - * cycle in the wait-graph where the job waits on itself - * - * @param resSlot - * the slot that contains the information about the resource - * @param jobSlot - * the slot that contains the information about the job + * cycle in the wait-graph where the job waits on itself - but not directly on itself (which happens e.g. in the + * case of upgrading a lock from S to X). + * + * @param resSlot the slot that contains the information about the resource + * @param jobSlot the slot that contains the information about the job * @return true if a cycle would be introduced, false otherwise */ private boolean introducesDeadlock(final long resSlot, final long jobSlot, final DeadlockTracker tracker) { + return introducesDeadlock(resSlot, jobSlot, tracker, 0); + } + + private boolean introducesDeadlock(final long resSlot, final long jobSlot, + final DeadlockTracker tracker, final int depth) { synchronized (jobArenaMgr) { tracker.pushResource(resSlot); long reqSlot = resArenaMgr.getLastHolder(resSlot); @@ -273,14 +287,22 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent tracker.pushRequest(reqSlot); final long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot); tracker.pushJob(holderJobSlot); - if (holderJobSlot == jobSlot) { + if (holderJobSlot == jobSlot && depth != 0) { return true; } + + // To determine if we have a deadlock we need to look at the waiters and at the upgraders. + // The scanWaiters flag indicates if we are currently scanning the waiters (true) or the upgraders + // (false). boolean scanWaiters = true; long waiter = jobArenaMgr.getLastWaiter(holderJobSlot); + if (waiter < 0 && scanWaiters) { + scanWaiters = false; + waiter = jobArenaMgr.getLastUpgrader(holderJobSlot); + } while (waiter >= 0) { - long watingOnResSlot = reqArenaMgr.getResourceId(waiter); - if (introducesDeadlock(watingOnResSlot, jobSlot, tracker)) { + long waitingOnResSlot = reqArenaMgr.getResourceId(waiter); + if (introducesDeadlock(waitingOnResSlot, jobSlot, tracker, depth + 1)) { return true; } waiter = reqArenaMgr.getNextJobRequest(waiter); @@ -289,6 +311,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent waiter = jobArenaMgr.getLastUpgrader(holderJobSlot); } } + tracker.pop(); // job tracker.pop(); // request reqSlot = reqArenaMgr.getNextRequest(reqSlot); @@ -354,6 +377,8 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent throw new IllegalStateException(); } } + } catch (InterruptedException e) { + throw new WaitInterruptedException(txnContext, "interrupted", e); } finally { if (reqSlot != -1) { // deallocate request, if we allocated one earlier @@ -422,7 +447,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent @Override public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode, - ITransactionContext txnContext) throws ACIDException { + ITransactionContext txnContext) throws ACIDException { log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext); stats.instantTryLock(); @@ -524,17 +549,15 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent final int oldMaxMode = resArenaMgr.getMaxMode(resource); final int newMaxMode = determineNewMaxMode(resource, oldMaxMode); resArenaMgr.setMaxMode(resource, newMaxMode); - if (oldMaxMode != newMaxMode) { - // the locking mode didn't change, current waiters won't be - // able to acquire the lock, so we do not need to signal them - group.wakeUp(); - } + group.wakeUp(); } } finally { group.releaseLatch(); } - // dataset intention locks are cleaned up at the end of the job + // dataset intention locks are + // a) kept in dsLockCache and + // b) cleaned up only in releaseLocks at the end of the job } @Override @@ -606,13 +629,15 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent resArenaMgr.setPkHashVal(resSlot, entityHashValue); resArenaMgr.setNext(resSlot, group.firstResourceIndex.get()); group.firstResourceIndex.set(resSlot); - if (DEBUG_MODE) - LOGGER.finer("new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue - + ")"); + if (DEBUG_MODE) { + LOGGER.finer("new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + + entityHashValue + ")"); + } } else { - if (DEBUG_MODE) - LOGGER.finer("fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue - + ")"); + if (DEBUG_MODE) { + LOGGER.finer("fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + + entityHashValue + ")"); + } } return resSlot; } @@ -644,13 +669,10 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent * a) (wait and) convert the lock once conversion becomes viable or * b) acquire the lock if we want to lock the same resource with the same * lock mode for the same job. - * - * @param resource - * the resource slot that's being locked - * @param job - * the job slot of the job locking the resource - * @param lockMode - * the lock mode that the resource should be locked with + * + * @param resource the resource slot that's being locked + * @param job the job slot of the job locking the resource + * @param lockMode the lock mode that the resource should be locked with * @return */ private LockAction updateActionForSameJob(long resource, long job, byte lockMode) { @@ -698,6 +720,17 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent } } + private boolean hasOtherHolders(long resSlot, long jobSlot) { + long holder = resArenaMgr.getLastHolder(resSlot); + while (holder != -1) { + if (reqArenaMgr.getJobSlot(holder) != jobSlot) { + return true; + } + holder = reqArenaMgr.getNextRequest(holder); + } + return false; + } + private long removeLastHolder(long resource, long jobSlot, byte lockMode) { long holder = resArenaMgr.getLastHolder(resource); if (holder < 0) { @@ -848,13 +881,10 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent * remove the first request for a given job and lock mode from a request queue. * If the value of the parameter lockMode is LockMode.ANY the first request * for the job is removed - independent of the LockMode. - * - * @param head - * the head of the request queue - * @param jobSlot - * the job slot - * @param lockMode - * the lock mode + * + * @param head the head of the request queue + * @param jobSlot the job slot + * @param lockMode the lock mode * @return the slot of the first request that matched the given job */ private long removeRequestFromQueueForJob(long head, long jobSlot, byte lockMode) { @@ -947,30 +977,34 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent } private void assertLocksCanBefoundInJobQueue() throws ACIDException { - for (int i = 0; i < ResourceGroupTable.TABLE_SIZE; ++i) { - final ResourceGroup group = table.get(i); - if (group.tryLatch(100, TimeUnit.MILLISECONDS)) { - try { - long resSlot = group.firstResourceIndex.get(); - while (resSlot != -1) { - int dsId = resArenaMgr.getDatasetId(resSlot); - int entityHashValue = resArenaMgr.getPkHashVal(resSlot); - long reqSlot = resArenaMgr.getLastHolder(resSlot); - while (reqSlot != -1) { - byte lockMode = (byte) reqArenaMgr.getLockMode(reqSlot); - long jobSlot = reqArenaMgr.getJobSlot(reqSlot); - int jobId = jobArenaMgr.getJobId(jobSlot); - assertLockCanBeFoundInJobQueue(dsId, entityHashValue, lockMode, jobId); - reqSlot = reqArenaMgr.getNextRequest(reqSlot); + try { + for (int i = 0; i < table.size; ++i) { + final ResourceGroup group = table.get(i); + if (group.tryLatch(100, TimeUnit.MILLISECONDS)) { + try { + long resSlot = group.firstResourceIndex.get(); + while (resSlot != -1) { + int dsId = resArenaMgr.getDatasetId(resSlot); + int entityHashValue = resArenaMgr.getPkHashVal(resSlot); + long reqSlot = resArenaMgr.getLastHolder(resSlot); + while (reqSlot != -1) { + byte lockMode = (byte) reqArenaMgr.getLockMode(reqSlot); + long jobSlot = reqArenaMgr.getJobSlot(reqSlot); + int jobId = jobArenaMgr.getJobId(jobSlot); + assertLockCanBeFoundInJobQueue(dsId, entityHashValue, lockMode, jobId); + reqSlot = reqArenaMgr.getNextRequest(reqSlot); + } + resSlot = resArenaMgr.getNext(resSlot); } - resSlot = resArenaMgr.getNext(resSlot); + } finally { + group.releaseLatch(); } - } finally { - group.releaseLatch(); + } else { + LOGGER.warning("Could not check locks for " + group); } - } else { - LOGGER.warning("Could not check locks for " + group); } + } catch (InterruptedException e) { + throw new IllegalStateException("interrupted", e); } } @@ -986,7 +1020,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent /** * tries to find a lock request searching though the job queue - * + * * @param dsId * dataset id * @param entityHashValue @@ -1021,66 +1055,30 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent return -1; } - private String resQueueToString(long resSlot) { - return appendResQueue(new StringBuilder(), resSlot).toString(); + private TablePrinter getResourceTablePrinter() { + return new ResourceTablePrinter(table, resArenaMgr, reqArenaMgr, jobArenaMgr); } - private StringBuilder appendResQueue(StringBuilder sb, long resSlot) { - resArenaMgr.appendRecord(sb, resSlot); - sb.append("\n"); - appendReqQueue(sb, resArenaMgr.getLastHolder(resSlot)); - return sb; + private TablePrinter getDumpTablePrinter() { + return new DumpTablePrinter(table, resArenaMgr, reqArenaMgr, jobArenaMgr, jobIdSlotMap); } - private StringBuilder appendReqQueue(StringBuilder sb, long head) { - while (head != -1) { - reqArenaMgr.appendRecord(sb, head); - sb.append("\n"); - head = reqArenaMgr.getNextRequest(head); - } - return sb; + public String printByResource() { + return getResourceTablePrinter().append(new StringBuilder()).append("\n").toString(); } - public StringBuilder append(StringBuilder sb) { - table.getAllLatches(); - try { - sb.append(">>dump_begin\t>>----- [resTable] -----\n"); - table.append(sb); - sb.append(">>dump_end\t>>----- [resTable] -----\n"); - - sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n"); - resArenaMgr.append(sb); - sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n"); - - sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n"); - reqArenaMgr.append(sb); - sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n"); - - sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n"); - for (Integer i : jobIdSlotMap.keySet()) { - sb.append(i).append(" : "); - TypeUtil.Global.append(sb, jobIdSlotMap.get(i)); - sb.append("\n"); - } - sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n"); - - sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n"); - jobArenaMgr.append(sb); - sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n"); - } finally { - table.releaseAllLatches(); - } - return sb; + public String toString() { + return printByResource(); } - public String toString() { - return append(new StringBuilder()).toString(); + public String dump() { + return getDumpTablePrinter().append(new StringBuilder()).toString(); } @Override public String prettyPrint() throws ACIDException { StringBuilder s = new StringBuilder("\n########### LockManager Status #############\n"); - return append(s).toString() + "\n"; + return getDumpTablePrinter().append(s).toString() + "\n"; } @Override @@ -1090,7 +1088,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent @Override public void dumpState(OutputStream os) throws IOException { - os.write(toString().getBytes()); + os.write(dump().getBytes()); } @Override @@ -1140,119 +1138,5 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent } } - private static class ResourceGroupTable { - public static final int TABLE_SIZE = 1024; // TODO increase? - - private ResourceGroup[] table; - - public ResourceGroupTable() { - table = new ResourceGroup[TABLE_SIZE]; - for (int i = 0; i < TABLE_SIZE; ++i) { - table[i] = new ResourceGroup(); - } - } - - ResourceGroup get(int dId, int entityHashValue) { - // TODO ensure good properties of hash function - int h = Math.abs(dId ^ entityHashValue); - if (h < 0) - h = 0; - return table[h % TABLE_SIZE]; - } - - ResourceGroup get(int i) { - return table[i]; - } - - public void getAllLatches() { - for (int i = 0; i < TABLE_SIZE; ++i) { - table[i].getLatch(); - } - } - - public void releaseAllLatches() { - for (int i = 0; i < TABLE_SIZE; ++i) { - table[i].releaseLatch(); - } - } - - public StringBuilder append(StringBuilder sb) { - return append(sb, false); - } - - public StringBuilder append(StringBuilder sb, boolean detail) { - for (int i = 0; i < table.length; ++i) { - sb.append(i).append(" : "); - if (detail) { - sb.append(table[i]); - } else { - sb.append(table[i].firstResourceIndex); - } - sb.append('\n'); - } - return sb; - } - } - - private static class ResourceGroup { - private ReentrantReadWriteLock latch; - private Condition condition; - AtomicLong firstResourceIndex; - - ResourceGroup() { - latch = new ReentrantReadWriteLock(); - condition = latch.writeLock().newCondition(); - firstResourceIndex = new AtomicLong(-1); - } - - void getLatch() { - log("latch"); - latch.writeLock().lock(); - } - - boolean tryLatch(long timeout, TimeUnit unit) throws ACIDException { - log("tryLatch"); - try { - return latch.writeLock().tryLock(timeout, unit); - } catch (InterruptedException e) { - LOGGER.finer("interrupted while wating on ResourceGroup"); - throw new ACIDException("interrupted", e); - } - } - - void releaseLatch() { - log("release"); - latch.writeLock().unlock(); - } - - boolean hasWaiters() { - return latch.hasQueuedThreads(); - } - - void await(ITransactionContext txnContext) throws ACIDException { - log("wait for"); - try { - condition.await(); - } catch (InterruptedException e) { - LOGGER.finer("interrupted while wating on ResourceGroup"); - throw new ACIDException(txnContext, "interrupted", e); - } - } - - void wakeUp() { - log("notify"); - condition.signalAll(); - } - - void log(String s) { - if (LOGGER.isLoggable(LVL)) { - LOGGER.log(LVL, s + " " + toString()); - } - } - - public String toString() { - return "{ id : " + hashCode() + ", first : " + TypeUtil.Global.toString(firstResourceIndex.get()) - + ", waiters : " + (hasWaiters() ? "true" : "false") + " }"; - } - } } + http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java new file mode 100644 index 0000000..ffdb151 --- /dev/null +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.transaction.management.service.locking; + +import java.util.concurrent.ConcurrentHashMap; + +public class DumpTablePrinter implements TablePrinter { + private ResourceGroupTable table; + private ResourceArenaManager resArenaMgr; + private RequestArenaManager reqArenaMgr; + private JobArenaManager jobArenaMgr; + private ConcurrentHashMap jobIdSlotMap; + + DumpTablePrinter(ResourceGroupTable table, + ResourceArenaManager resArenaMgr, + RequestArenaManager reqArenaMgr, + JobArenaManager jobArenaMgr, + ConcurrentHashMap jobIdSlotMap) { + this.table = table; + this.resArenaMgr = resArenaMgr; + this.reqArenaMgr = reqArenaMgr; + this.jobArenaMgr = jobArenaMgr; + this.jobIdSlotMap = jobIdSlotMap; + } + + public StringBuilder append(StringBuilder sb) { + table.getAllLatches(); + try { + sb.append(">>dump_begin\t>>----- [resTable] -----\n"); + table.append(sb); + sb.append(">>dump_end\t>>----- [resTable] -----\n"); + + sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n"); + resArenaMgr.append(sb); + sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n"); + + sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n"); + reqArenaMgr.append(sb); + sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n"); + + sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n"); + for (Integer i : jobIdSlotMap.keySet()) { + sb.append(i).append(" : "); + TypeUtil.Global.append(sb, jobIdSlotMap.get(i)); + sb.append("\n"); + } + sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n"); + + sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n"); + jobArenaMgr.append(sb); + sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n"); + } finally { + table.releaseAllLatches(); + } + return sb; + } + + String resQueueToString(long resSlot) { + return appendResQueue(new StringBuilder(), resSlot).toString(); + } + + StringBuilder appendResQueue(StringBuilder sb, long resSlot) { + resArenaMgr.appendRecord(sb, resSlot); + sb.append("\n"); + appendReqQueue(sb, resArenaMgr.getLastHolder(resSlot)); + return sb; + } + + StringBuilder appendReqQueue(StringBuilder sb, long head) { + while (head != -1) { + reqArenaMgr.appendRecord(sb, head); + sb.append("\n"); + head = reqArenaMgr.getNextRequest(head); + } + return sb; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java index aaa96bb..1dbf16b 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java @@ -392,7 +392,6 @@ class LockRequestController implements Runnable { class LockRequestWorker implements Runnable { String threadName; - TransactionSubsystem txnProvider; ILockManager lockMgr; WorkerReadyQueue workerReadyQueue; LockRequest lockRequest; @@ -401,7 +400,6 @@ class LockRequestWorker implements Runnable { boolean isDone; public LockRequestWorker(TransactionSubsystem txnProvider, WorkerReadyQueue workerReadyQueue, String threadName) { - this.txnProvider = txnProvider; this.lockMgr = txnProvider.getLockManager(); this.workerReadyQueue = workerReadyQueue; this.threadName = new String(threadName); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroup.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroup.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroup.java new file mode 100644 index 0000000..bec4e53 --- /dev/null +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroup.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.locking; + +import org.apache.asterix.common.transactions.ITransactionContext; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A ResourceGroup represents a group of resources that are manged by a ConcurrentLockManager. + * All resources in a group share a common latch. I.e. all modifications of lock requests for any resource in a group + * are protected by the same latch. + * + * @see ConcurrentLockManager + */ +class ResourceGroup { + private ReentrantReadWriteLock latch; + private Condition condition; + AtomicLong firstResourceIndex; + + ResourceGroup() { + latch = new ReentrantReadWriteLock(); + condition = latch.writeLock().newCondition(); + firstResourceIndex = new AtomicLong(-1); + } + + void getLatch() { + log("latch"); + latch.writeLock().lock(); + } + + boolean tryLatch(long timeout, TimeUnit unit) throws InterruptedException { + log("tryLatch"); + try { + return latch.writeLock().tryLock(timeout, unit); + } catch (InterruptedException e) { + ConcurrentLockManager.LOGGER.finer("interrupted while wating on ResourceGroup"); + throw e; + } + } + + void releaseLatch() { + log("release"); + latch.writeLock().unlock(); + } + + boolean hasWaiters() { + return latch.hasQueuedThreads(); + } + + void await(ITransactionContext txnContext) throws InterruptedException { + log("wait for"); + try { + condition.await(); + } catch (InterruptedException e) { + ConcurrentLockManager.LOGGER.finer("interrupted while waiting on ResourceGroup"); + throw e; + } + } + + void wakeUp() { + log("notify"); + condition.signalAll(); + } + + void log(String s) { + if (ConcurrentLockManager.LOGGER.isLoggable(ConcurrentLockManager.LVL)) { + ConcurrentLockManager.LOGGER.log(ConcurrentLockManager.LVL, s + " " + toString()); + } + } + + public String toString() { + return "{ id : " + hashCode() + ", first : " + TypeUtil.Global.toString(firstResourceIndex.get()) + ", " + + "waiters : " + (hasWaiters() ? "true" : "false") + " }"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroupTable.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroupTable.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroupTable.java new file mode 100644 index 0000000..213ccd9 --- /dev/null +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroupTable.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.locking; + +/** + * A hash table for ResourceGroups. As each ResourceGroup has a latch that protects the modifications for resources in + * that group, the size of a ResourceGroupTable determines the maximal number of lock requests that can concurrently + * be served by a ConcurrentLockManager. + * + * @see ResourceGroup + * @see ConcurrentLockManager + */ + +class ResourceGroupTable { + public final int size; + + private ResourceGroup[] table; + + public ResourceGroupTable(int size) { + this.size = size; + table = new ResourceGroup[size]; + for (int i = 0; i < size; ++i) { + table[i] = new ResourceGroup(); + } + } + + ResourceGroup get(int dId, int entityHashValue) { + // TODO ensure good properties of hash function + int h = Math.abs(dId ^ entityHashValue); + if (h < 0) h = 0; + return table[h % size]; + } + + ResourceGroup get(int i) { + return table[i]; + } + + public void getAllLatches() { + for (int i = 0; i < size; ++i) { + table[i].getLatch(); + } + } + + public void releaseAllLatches() { + for (int i = 0; i < size; ++i) { + table[i].releaseLatch(); + } + } + + public StringBuilder append(StringBuilder sb) { + return append(sb, false); + } + + public StringBuilder append(StringBuilder sb, boolean detail) { + for (int i = 0; i < table.length; ++i) { + sb.append(i).append(" : "); + if (detail) { + sb.append(table[i]); + } else { + sb.append(table[i].firstResourceIndex); + } + sb.append('\n'); + } + return sb; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java new file mode 100644 index 0000000..90c1f69 --- /dev/null +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.transaction.management.service.locking; + +import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants; + +import static org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode; + +/** + * Creates a JSON serialization of the lock table of the ConcurrentLockManager organized by resource. I.e. the + * serialization will contain all resources for which lock request are recorded in the table - along with a list of + * the requests for each resource. + * + * @see ConcurrentLockManager + */ +public class ResourceTablePrinter implements TablePrinter { + private ResourceGroupTable table; + private ResourceArenaManager resArenaMgr; + private RequestArenaManager reqArenaMgr; + private JobArenaManager jobArenaMgr; + + ResourceTablePrinter(ResourceGroupTable table, + ResourceArenaManager resArenaMgr, + RequestArenaManager reqArenaMgr, + JobArenaManager jobArenaMgr) { + this.table = table; + this.resArenaMgr = resArenaMgr; + this.reqArenaMgr = reqArenaMgr; + this.jobArenaMgr = jobArenaMgr; + } + + public StringBuilder append(StringBuilder sb) { + table.getAllLatches(); + sb.append("[\n"); + int i = 0; + long res = -1; + while (res == -1 && i < table.size) { + res = table.get(i++).firstResourceIndex.get(); + } + while (i < table.size) { + sb = appendResource(sb, res); + res = resArenaMgr.getNext(res); + while (res == -1 && i < table.size) { + res = table.get(i++).firstResourceIndex.get(); + } + if (res == -1) { + sb.append("\n"); + break; + } else { + sb.append(",\n"); + } + } + table.releaseAllLatches(); + return sb.append("]"); + } + + StringBuilder appendResource(StringBuilder sb, long res) { + sb.append("{ \"dataset\": ").append(resArenaMgr.getDatasetId(res)); + sb.append(", \"hash\": ").append(resArenaMgr.getPkHashVal(res)); + sb.append(", \"max mode\": ").append(string(resArenaMgr.getMaxMode(res))); + long lastHolder = resArenaMgr.getLastHolder(res); + if (lastHolder != -1) { + sb = appendRequests(sb.append(", \"holders\": "), lastHolder); + } + long firstUpgrader = resArenaMgr.getFirstUpgrader(res); + if (firstUpgrader != -1) { + sb = appendRequests(sb.append(", \"upgraders\": "), firstUpgrader); + } + long firstWaiter = resArenaMgr.getFirstWaiter(res); + if (firstWaiter != -1) { + sb = appendRequests(sb.append(", \"waiters\": "), firstWaiter); + } + return sb.append(" }"); + } + + StringBuilder appendRequests(StringBuilder sb, long req) { + sb.append("[ "); + while (req != -1) { + appendRequest(sb, req); + req = reqArenaMgr.getNextRequest(req); + sb.append(req == -1 ? " ]" : ", "); + } + return sb; + } + + StringBuilder appendRequest(StringBuilder sb, long req) { + long job = reqArenaMgr.getJobSlot(req); + sb.append("{ \"job\": ").append(jobArenaMgr.getJobId(job)); + sb.append(", \"mode\": \"").append(string(reqArenaMgr.getLockMode(req))); + return sb.append("\" }"); + } + + private static final String string(int lockMode) { + return LockMode.toString((byte) lockMode); + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java new file mode 100644 index 0000000..2b4260b --- /dev/null +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.transaction.management.service.locking; + +public interface TablePrinter { + StringBuilder append(StringBuilder sb); +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java new file mode 100644 index 0000000..8171f77 --- /dev/null +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.locking; + +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.ITransactionContext; + +public class WaitInterruptedException extends ACIDException { + public WaitInterruptedException(ITransactionContext txnContext, String message, Throwable cause) { + super(txnContext, message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java index d371e94..6650ac6 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java @@ -54,7 +54,7 @@ public class TransactionSubsystem implements ITransactionSubsystem { this.id = id; this.txnProperties = txnProperties; this.transactionManager = new TransactionManager(this); - this.lockManager = new ConcurrentLockManager(this); + this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer()); AsterixReplicationProperties asterixReplicationProperties = null; if (asterixAppRuntimeContextProvider != null) { @@ -108,4 +108,4 @@ public class TransactionSubsystem implements ITransactionSubsystem { return id; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java new file mode 100644 index 0000000..5cabd04 --- /dev/null +++ b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java @@ -0,0 +1,398 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.asterix.transaction.management.service.locking; + + +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.DatasetId; +import org.apache.asterix.common.transactions.ILockManager; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.JobId; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.ConsoleHandler; +import java.util.logging.Logger; + +import static org.apache.asterix.transaction.management.service.locking.Request.Kind; +import static org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class LockManagerUnitTest { + + public static int LOCK_MGR_SHRINK_TIMER = 5000; + public static int LOCK_MGR_ARENAS = 2; + public static int LOCK_MGR_TABLE_SIZE = 10; + + static int INITIAL_TIMESTAMP = 0; + static long COORDINATOR_SLEEP = 20; + static int TIMEOUT_MS = 100; + + static { + Logger.getLogger(ConcurrentLockManager.class.getName()).addHandler(new ConsoleHandler()); + } + + Map jobMap; + ILockManager lockMgr; + + // set to e.g. System.err to get some output + PrintStream out = System.out; + PrintStream err = null; //System.err; + + //-------------------------------------------------------------------- + // JUnit methods + //-------------------------------------------------------------------- + + @Before + public void setUp() throws Exception { + jobMap = new HashMap<>(); + lockMgr = new ConcurrentLockManager(LOCK_MGR_SHRINK_TIMER, LOCK_MGR_ARENAS, LOCK_MGR_TABLE_SIZE); + } + + @After + public void tearDown() throws Exception { + lockMgr = null; + jobMap = null; + } + + @Test + public void testSimpleSharedUnlock() throws Exception { + List reqs = new ArrayList<>(); + reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S)); + reqs.add(req(Kind.UNLOCK, j(1), d(1), e(1), LockMode.S)); + reqs.add(req(Kind.UNLOCK, j(1), d(1), e(-1), LockMode.IS)); + reportErrors(execute(reqs)); + } + + @Test + public void testSimpleSharedRelease() throws Exception { + List reqs = new ArrayList<>(); + reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S)); + reqs.add(req(Kind.RELEASE, j(1))); + reportErrors(execute(reqs)); + } + + @Test + public void testReacquire() throws Exception { + List reqs = new ArrayList<>(); + reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S)); + reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S)); + reqs.add(req(Kind.RELEASE, j(1))); + reportErrors(execute(reqs)); + } + + @Test + public void testInstant() throws Exception { + List reqs = new ArrayList<>(); + reqs.add(req(Kind.INSTANT_LOCK, j(1), d(1), e(1), LockMode.X)); + reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X)); + reqs.add(req(Kind.PRINT)); + reqs.add(req(Kind.INSTANT_LOCK, j(3), d(1), e(1), LockMode.S)); + expectError(execute(reqs), j(3), WaitInterruptedException.class); + } + + @Test + public void testTry() throws Exception { + List reqs = new ArrayList<>(); + reqs.add(req(Kind.TRY_LOCK, j(1), d(1), e(1), LockMode.S)); + reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.S)); + reqs.add(req(Kind.PRINT)); + reqs.add(req(Kind.TRY_LOCK, j(3), d(1), e(1), LockMode.X)); + reportErrors(execute(reqs)); + } + + @Test + public void testInstantTry() throws Exception { + List reqs = new ArrayList<>(); + reqs.add(req(Kind.INSTANT_LOCK, j(1), d(1), e(1), LockMode.X)); + reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X)); + reqs.add(req(Kind.PRINT)); + reqs.add(req(Kind.INSTANT_TRY_LOCK, j(3), d(1), e(1), LockMode.S)); + reportErrors(execute(reqs)); + } + + @Test + public void testDeadlock() throws Exception { + List reqs = new ArrayList<>(); + reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X)); + reqs.add(req(Kind.LOCK, j(2), d(1), e(2), LockMode.X)); + reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X)); + reqs.add(req(Kind.LOCK, j(1), d(1), e(2), LockMode.X)); + reqs.add(req(Kind.RELEASE, j(1))); + reqs.add(req(Kind.RELEASE, j(2))); + expectError(execute(reqs), j(1), ACIDException.class); + } + + @Test + public void testUpgrade() throws Exception { + List reqs = new ArrayList<>(); + reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S)); + reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X)); + reqs.add(req(Kind.RELEASE, j(1))); + reportErrors(execute(reqs)); + } + + @Test + public void testUpgradeDeadlock() throws Exception { + List reqs = new ArrayList<>(); + reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S)); + reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.S)); + reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X)); + reqs.add(req(Kind.PRINT)); + reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X)); + reqs.add(req(Kind.RELEASE, j(1))); + reqs.add(req(Kind.RELEASE, j(2))); + expectError(execute(reqs), j(2), ACIDException.class); + } + + @Test + /** + * Runs into a time-out and j(1) gets interrupted by + * the test. This scenario happens only in this test as there + * is additional synchronization between the locking threads + * through the coordinator. + */ + public void testTimeout() throws Exception { + List reqs = new ArrayList<>(); + reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S)); + reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.S)); + reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X)); + reqs.add(req(Kind.RELEASE, j(1))); + reqs.add(req(Kind.RELEASE, j(2))); + // this runs into a time-out and j(1) gets interrupted + expectError(execute(reqs), j(1), WaitInterruptedException.class); + } + + //-------------------------------------------------------------------- + // Helper methods + //-------------------------------------------------------------------- + + /** + * Executes a list of requests where + * a) each job runs in a different thread and + * b) the threads/jobs are synchronized + * The synchronization ensures that the requests are submitted to the + * LockManager in list order, however they are fulfilled in the order + * decided by the LockManager + * + * @param reqs a list of requests that will be execute in order + * @return a map of (JodId, exception) pairs that can either be handled + * by the test or thrown using #reportErrors + */ + private Map execute(List reqs) throws InterruptedException { + if (err != null) { + err.println("*** start ***"); + } + final AtomicInteger timeStamp = new AtomicInteger(INITIAL_TIMESTAMP); + Set lockers = createLockers(reqs, timeStamp); + Map threads = startThreads(lockers); + + int coordinatorTime = timeStamp.get(); + while (active(lockers)) { + if (err != null) { + err.println("coordinatorTime = " + coordinatorTime); + } + if (coordinatorTime == timeStamp.get()) { + Thread.sleep(COORDINATOR_SLEEP); + if (coordinatorTime == timeStamp.get()) { + Locker timedOut = timedOut(lockers); + if (timedOut != null) { + if (err != null) { + err.println(timedOut.name + " timed out"); + } + break; + } + } + } + coordinatorTime = timeStamp.get(); + } + Map result = stopThreads(lockers, threads); + return result; + } + + private boolean active(Set lockers) { + for (Locker locker : lockers) { + if (locker.active()) { + return true; + } + } + return false; + } + + private Locker timedOut(Set lockers) { + for (Locker locker : lockers) { + if (locker.timedOut()) { + return locker; + } + } + return null; + } + + private Set createLockers(List reqs, AtomicInteger timeStamp) { + Set lockers = new HashSet<>(); + lockers.add(new Locker(lockMgr, null, reqs, timeStamp, err)); + for (ITransactionContext txnCtx : jobMap.values()) { + Locker locker = new Locker(lockMgr, txnCtx, reqs, timeStamp, err); + lockers.add(locker); + } + return lockers; + } + + private Map startThreads(Set lockers) { + Map threads = new HashMap<>(lockers.size()); + for (Locker locker : lockers) { + Thread t = new Thread(locker, locker.name); + threads.put(locker.name, t); + t.start(); + } + return threads; + } + + private Map stopThreads(Set lockers, Map threads) throws + InterruptedException { + Map result = new HashMap<>(); + for (Locker locker : lockers) { + stopThread(threads.get(locker.name)); + List errors = locker.getErrors(); + if (errors != null) { + errors.forEach(error -> result.put(locker.name, error)); + } + } + return result; + } + + private void stopThread(Thread t) throws InterruptedException { + if (err != null) { + err.println("stopping " + t.getName() + " " + t.getState()); + } + boolean done = false; + while (!done) { + switch (t.getState()) { + case NEW: + case RUNNABLE: + case TERMINATED: + done = true; + break; + default: + if (err != null) { + err.println("interrupting " + t.getName()); + } + t.interrupt(); + } + } + if (err != null) { + err.println("joining " + t.getName()); + } + t.join(); + } + + /** + * throws the first Throwable found in the map. + * This is the default way to handle the errors returned by #execute + * + * @param errors a map of (JodId, exception) pairs + */ + void reportErrors(Map errors) { + for (String name : errors.keySet()) { + throw new AssertionError("job " + name + " caught something", errors.get(name)); + } + out.println("no errors"); + } + + void printErrors(Map errors) { + errors.keySet().forEach(name -> out.println("Thread " + name + " caught " + errors.get(name))); + } + + /** + * gets the error for a specific job from the errors map + * + * @param errors a map of (JodId, throwable) pairs + * @param txnCtx the transaction context of the job whose error is requested + * @return throwable for said error + */ + private static Throwable getError(Map errors, ITransactionContext txnCtx) { + return errors.get(txnCtx.getJobId().toString()); + } + + /** + * asserts that the error for a specific job from the errors map is of a specific class + * + * @param errors a map of (JodId, throwable) pairs + * @param txnCtx the transaction context of the job whose error is requested + * @param clazz the exception class + */ + private void expectError(Map errors, ITransactionContext txnCtx, + Class clazz) throws Exception { + Throwable error = getError(errors, txnCtx); + if (error == null) { + throw new AssertionError("expected " + clazz.getSimpleName() + " for " + txnCtx.getJobId() + ", got no " + + "exception"); + } + if (!clazz.isInstance(error)) { + throw new AssertionError(error); + } + out.println("caught expected " + error); + } + + //-------------------------------------------------------------------- + // Convenience methods to make test description more compact + //-------------------------------------------------------------------- + + private Request req(final Kind kind, final ITransactionContext txnCtx, + final DatasetId dsId, final int hashValue, final byte lockMode) { + return Request.create(kind, txnCtx, dsId, hashValue, lockMode); + } + + private Request req(final Kind kind, final ITransactionContext txnCtx) { + return Request.create(kind, txnCtx); + } + + private Request req(final Kind kind) { + return Request.create(kind, out); + } + + private static DatasetId d(int id) { + return new DatasetId(id); + } + + private static int e(int i) { + return i; + } + + private ITransactionContext j(int jId) { + if (!jobMap.containsKey(jId)) { + ITransactionContext mockTxnContext = mock(ITransactionContext.class); + when(mockTxnContext.getJobId()).thenReturn(new JobId(jId)); + jobMap.put(jId, mockTxnContext); + } + return jobMap.get(jId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java new file mode 100644 index 0000000..164fc07 --- /dev/null +++ b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java @@ -0,0 +1,187 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.asterix.transaction.management.service.locking; + +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.ILockManager; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.junit.Assert; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Executes a sequence of lock requests against an ILockManager. + * Lockers are run by different threads in the LockManagerUnitTest. + * + * @see ILockManager + * @see LockManagerUnitTest + */ +class Locker implements Runnable { + + public String name; + + private ILockManager lockMgr; + + private List requests; + private Iterator reqIter; + private volatile Requester curReq; + private int reqStart; + + private AtomicInteger globalTime; + private List errors; + + private PrintStream err; + + /** + * @param lockMgr the ILockManager to send requests to + * @param txnCtx the ITransactionContext that identifies the transaction that this Locker represents + * @param allRequests an ordered list of lock requests for multiple transactions, this Locker will only execute + * requests for the transaction identified by txnCtx + * @param time a global timestamp that is used to synchronize different lockers to ensure that requests are started + * in the order given in allRequests + * @param err a stream to write log/error information to + * + * @see Request + */ + Locker(ILockManager lockMgr, ITransactionContext txnCtx, List allRequests, AtomicInteger time, + PrintStream err) { + this.name = txnCtx == null ? "admin" : txnCtx.getJobId().toString(); + this.lockMgr = lockMgr; + + this.requests = new LinkedList<>(); + for (int pos = 0; pos < allRequests.size(); ++pos) { + Request req = allRequests.get(pos); + if (req.txnCtx == txnCtx) { + requests.add(new Requester(pos, req)); + } + } + this.reqIter = requests.iterator(); + this.globalTime = time; + this.err = err; + } + + private boolean hasErrors() { + return errors != null && errors.size() > 0; + } + + synchronized List getErrors() { + return errors; + } + + private synchronized void addError(Throwable error) { + log("caught " + error); + if (this.errors == null) { + this.errors = Collections.synchronizedList(new ArrayList()); + } + this.errors.add(error); + } + + public synchronized boolean active() { + return !hasErrors() && (reqIter.hasNext() || curReq != null); + } + + public synchronized boolean timedOut() { + return reqStart > 0 && (currentTime() - reqStart) > LockManagerUnitTest.TIMEOUT_MS; + } + + @Override + public void run() { + log("running"); + try { + while (! hasErrors() && reqIter.hasNext()) { + curReq = reqIter.next(); + int localTime = globalTime.get(); + while (localTime < curReq.time) { + Thread.sleep(10); + localTime = globalTime.get(); + } + if (localTime != curReq.time) { + throw new AssertionError("missed time for request " + curReq); + } + log("will exec at t=" + localTime + " " + curReq); + try { + reqStart = currentTime(); + Assert.assertEquals(localTime, globalTime.getAndIncrement()); + log("incremented"); + curReq.setResult(curReq.request.execute(lockMgr) ? Requester.SUCCESS : Requester.FAIL); + } catch (ACIDException e) { + curReq.setResult(Requester.ERROR); + addError(e); + } finally { + reqStart = -1; + } + log("time " + localTime); + } + curReq = null; + } catch (InterruptedException ie) { + log("got interrupted"); + } catch (Throwable e) { + if (err != null) { + e.printStackTrace(err); + } + addError(e); + } + log("done"); + } + + private void log(String msg) { + if (err != null) { + err.println(Thread.currentThread().getName() + " " + msg); + } + } + + private static int currentTime() { + return ((int) System.currentTimeMillis()) & 0x7fffffff; + } + + public String toString() { + return "[" + name + "]" + curReq; + } +} + +class Requester { + + public static byte NONE = -1; + public static byte FAIL = 0; + public static byte SUCCESS = 1; + public static byte ERROR = 2; + + int time; + Request request; + byte result = NONE; + + Requester(int time, Request request) { + this.time = time; + this.request = request; + } + + void setResult(byte res) { + result = res; + } + + public String toString() { + return request.toString() + " t=" + time; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f8daac23/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java new file mode 100644 index 0000000..c7e0c42 --- /dev/null +++ b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java @@ -0,0 +1,166 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.asterix.transaction.management.service.locking; + +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.DatasetId; +import org.apache.asterix.common.transactions.ILockManager; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants; + +import java.io.PrintStream; + +/** + * repesents a lock request for testing. + */ +abstract class Request { + /** the kind of a request */ + enum Kind { + /** requests an instant-try-lock */ + INSTANT_TRY_LOCK, + /** requests an instant-lock */ + INSTANT_LOCK, + /** requests a lock */ + LOCK, + /** prints a JSON representation of the lock table by entity */ + PRINT, + /** releases all locks */ + RELEASE, + /** requests a try-lock */ + TRY_LOCK, + /** unlocks a lock */ + UNLOCK + } + + Kind kind; + ITransactionContext txnCtx; + + Request(Kind kind, ITransactionContext txnCtx) { + this.kind = kind; + this.txnCtx = txnCtx; + } + + String asString(final Kind kind, final ITransactionContext txnCtx, + final DatasetId dsId, final int hashValue, final byte lockMode) { + return txnCtx.getJobId().toString() + ":" + kind.name() + ":" + dsId.getId() + ":" + hashValue + ":" + + TransactionManagementConstants.LockManagerConstants.LockMode.toString(lockMode); + } + + abstract boolean execute(ILockManager lockMgr) throws ACIDException; + + static Request create(final Kind kind, final ITransactionContext txnCtx, + final DatasetId dsId, final int hashValue, final byte lockMode) { + switch (kind) { + case INSTANT_TRY_LOCK: + return new Request(kind, txnCtx) { + boolean execute(ILockManager lockMgr) throws ACIDException { + return lockMgr.instantTryLock(dsId, hashValue, lockMode, txnCtx); + } + + public String toString() { + return asString(kind, txnCtx, dsId, hashValue, lockMode); + } + }; + case INSTANT_LOCK: + return new Request(kind, txnCtx) { + boolean execute(ILockManager lockMgr) throws ACIDException { + lockMgr.instantLock(dsId, hashValue, lockMode, txnCtx); + return true; + } + + public String toString() { + return asString(kind, txnCtx, dsId, hashValue, lockMode); + } + }; + case LOCK: + return new Request(kind, txnCtx) { + boolean execute(ILockManager lockMgr) throws ACIDException { + lockMgr.lock(dsId, hashValue, lockMode, txnCtx); + return true; + } + + public String toString() { + return asString(kind, txnCtx, dsId, hashValue, lockMode); + } + }; + case TRY_LOCK: + return new Request(kind, txnCtx) { + boolean execute(ILockManager lockMgr) throws ACIDException { + return lockMgr.tryLock(dsId, hashValue, lockMode, txnCtx); + } + + public String toString() { + return asString(kind, txnCtx, dsId, hashValue, lockMode); + } + }; + case UNLOCK: + return new Request(kind, txnCtx) { + boolean execute(ILockManager lockMgr) throws ACIDException { + lockMgr.unlock(dsId, hashValue, lockMode, txnCtx); + return true; + } + + public String toString() { + return asString(kind, txnCtx, dsId, hashValue, lockMode); + } + }; + default: + } + throw new AssertionError("Illegal Request Kind " + kind); + } + + static Request create(final Kind kind, final ITransactionContext txnCtx) { + if (kind == Kind.RELEASE) { + return new Request(kind, txnCtx) { + boolean execute(ILockManager lockMgr) throws ACIDException { + lockMgr.releaseLocks(txnCtx); + return true; + } + + public String toString() { + return txnCtx.getJobId().toString() + ":" + kind.name(); + } + }; + } + throw new AssertionError("Illegal Request Kind " + kind); + } + + static Request create(final Kind kind, final PrintStream out) { + if (kind == Kind.PRINT) { + return new Request(kind, null) { + boolean execute(ILockManager lockMgr) throws ACIDException { + if (out == null) { + return false; + } + if (!(lockMgr instanceof ConcurrentLockManager)) { + out.print("cannot print"); + return false; + } + out.print(((ConcurrentLockManager) lockMgr).printByResource()); + return true; + } + + public String toString() { + return kind.name(); + } + }; + } + throw new AssertionError("Illegal Request Kind " + kind); + } +}