asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Till Westmann (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: ASTERIXDB-1118: allow for lock conversion
Date Fri, 30 Oct 2015 23:54:29 GMT
Till Westmann has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/474

Change subject: ASTERIXDB-1118: allow for lock conversion
......................................................................

ASTERIXDB-1118: allow for lock conversion

Also improve debugability of ConcurrentLockManager and add new unit tests.

Change-Id: If49ed8d48fa8c71a52c880d4f42a2badbe6a57d7
---
M asterix-transactions/pom.xml
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
A asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
A asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java
A asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
A asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
A asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java
A asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
10 files changed, 1,112 insertions(+), 253 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/74/474/1

diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index b8673bb..6330801 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -111,6 +111,12 @@
 			<artifactId>guava</artifactId>
 			<version>18.0</version>
 		</dependency>
-	</dependencies>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.10.19</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 
 </project>
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index de8daf8..85383e7 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -19,6 +19,14 @@
 
 package org.apache.asterix.transaction.management.service.locking;
 
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -31,39 +39,26 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.config.AsterixTransactionProperties;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.DatasetId;
-import org.apache.asterix.common.transactions.ILockManager;
-import org.apache.asterix.common.transactions.ITransactionContext;
-import org.apache.asterix.common.transactions.ITransactionManager;
-import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
-import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
-import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-
 /**
  * An implementation of the ILockManager interface.
- *
- * @author tillw
  */
 public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
 
-    private static final Logger LOGGER
-        = Logger.getLogger(ConcurrentLockManager.class.getName());
-    private static final Level LVL = Level.FINER;
-    
+    static final Logger LOGGER
+            = Logger.getLogger(ConcurrentLockManager.class.getName());
+    static final Level LVL = Level.FINER;
+
     public static final boolean DEBUG_MODE = false;//true
     public static final boolean CHECK_CONSISTENCY = false;
 
-    private TransactionSubsystem txnSubsystem;
     private ResourceGroupTable table;
     private ResourceArenaManager resArenaMgr;
     private RequestArenaManager reqArenaMgr;
     private JobArenaManager jobArenaMgr;
     private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
     private ThreadLocal<DatasetLockCache> dsLockCache;
-    private LockManagerStats stats = new LockManagerStats(10000); 
-    
+    private LockManagerStats stats = new LockManagerStats(10000);
+
     enum LockAction {
         ERR(false, false),
         GET(false, false),
@@ -82,22 +77,20 @@
 
     static LockAction[][] ACTION_MATRIX = {
             // new    NL              IS               IX                S                X
-            { LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL
-            { LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS
-            { LockAction.ERR, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT }, // IX
-            { LockAction.ERR, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT }, // S
-            { LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT } // X
+            {LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD}, // NL
+            {LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT}, // IS
+            {LockAction.ERR, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT}, // IX
+            {LockAction.ERR, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT}, // S
+            {LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT} // X
     };
 
-    public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
-        this.txnSubsystem = txnSubsystem;
+    public ConcurrentLockManager(final int lockManagerShrinkTimer) throws ACIDException {
+        this(lockManagerShrinkTimer, Runtime.getRuntime().availableProcessors() * 2, 1024);
+        // TODO increase table size?
+    }
 
-        this.table = new ResourceGroupTable();
-
-        final int lockManagerShrinkTimer = txnSubsystem.getTransactionProperties().getLockManagerShrinkTimer();
-
-        int noArenas = Runtime.getRuntime().availableProcessors() * 2;
-
+    public ConcurrentLockManager(final int lockManagerShrinkTimer, final int noArenas, final int tableSize) throws ACIDException {
+        this.table = new ResourceGroupTable(tableSize);
         resArenaMgr = new ResourceArenaManager(noArenas, lockManagerShrinkTimer);
         reqArenaMgr = new RequestArenaManager(noArenas, lockManagerShrinkTimer);
         jobArenaMgr = new JobArenaManager(noArenas, lockManagerShrinkTimer);
@@ -109,17 +102,13 @@
         };
     }
 
-    public AsterixTransactionProperties getTransactionProperties() {
-        return this.txnSubsystem.getTransactionProperties();
-    }
-
     @Override
     public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
         log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         stats.lock();
-        
-        final int dsId = datasetId.getId();        
+
+        final int dsId = datasetId.getId();
         final int jobId = txnContext.getJobId().getId();
 
         if (entityHashValue != -1) {
@@ -131,7 +120,7 @@
         }
 
         final long jobSlot = findOrAllocJobSlot(jobId);
-        
+
         final ResourceGroup group = table.get(dsId, entityHashValue);
         group.getLatch();
         try {
@@ -143,6 +132,18 @@
             while (!locked) {
                 final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
                 switch (act) {
+                    case CONV:
+                        if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) {
+                            DeadlockTracker tracker = new CollectingTracker();
+                            tracker.pushJob(jobSlot);
+                            introducesDeadlock(resSlot, jobSlot, tracker);
+                            requestAbort(txnContext, tracker.toString());
+                            break;
+                        } else if (hasOtherHolders(resSlot, jobSlot)) {
+                            enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
+                            break;
+                        }
+                        //no break
                     case UPD:
                         resArenaMgr.setMaxMode(resSlot, lockMode);
                         // no break
@@ -151,7 +152,6 @@
                         locked = true;
                         break;
                     case WAIT:
-                    case CONV:
                         enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
                         break;
                     case ERR:
@@ -165,12 +165,12 @@
         } finally {
             group.releaseLatch();
         }
-        
+
         if (CHECK_CONSISTENCY) assertLocksCanBefoundInJobQueue();
     }
 
     private void enqueueWaiter(final ResourceGroup group, final long reqSlot, final long resSlot, final long jobSlot,
-            final LockAction act, ITransactionContext txnContext) throws ACIDException {
+                               final LockAction act, ITransactionContext txnContext) throws ACIDException {
         final Queue queue = act.modify ? upgrader : waiter;
         if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) {
             DeadlockTracker tracker = new CollectingTracker();
@@ -193,8 +193,8 @@
         void pushJob(long jobSlot);
         void pop();
     }
-    
-    static class NOPTracker implements DeadlockTracker {        
+
+    static class NOPTracker implements DeadlockTracker {
         static final DeadlockTracker INSTANCE = new NOPTracker();
 
         public void pushResource(long resSlot) {}
@@ -202,8 +202,11 @@
         public void pushJob(long jobSlot) {}
         public void pop() {}
     }
-    
+
     static class CollectingTracker implements DeadlockTracker {
+
+        static final boolean DEBUG = false;
+
         ArrayList<Long> slots = new ArrayList<Long>();
         ArrayList<String> types = new ArrayList<String>();
 
@@ -211,30 +214,30 @@
         public void pushResource(long resSlot) {
             types.add("Resource");
             slots.add(resSlot);
-            System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
         }
 
         @Override
         public void pushRequest(long reqSlot) {
             types.add("Request");
             slots.add(reqSlot);
-            System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
         }
 
         @Override
         public void pushJob(long jobSlot) {
             types.add("Job");
             slots.add(jobSlot);
-            System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
         }
 
         @Override
         public void pop() {
-            System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            if (DEBUG) System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
             types.remove(types.size() - 1);
-            slots.remove(slots.size() - 1);            
+            slots.remove(slots.size() - 1);
         }
-        
+
         @Override
         public String toString() {
             StringBuilder sb = new StringBuilder();
@@ -244,19 +247,22 @@
             return sb.toString();
         }
     }
-        
+
     /**
      * determine if adding a job to the waiters of a resource will introduce a
      * cycle in the wait-graph where the job waits on itself
-     * 
-     * @param resSlot
-     *            the slot that contains the information about the resource
-     * @param jobSlot
-     *            the slot that contains the information about the job
+     *
+     * @param resSlot the slot that contains the information about the resource
+     * @param jobSlot the slot that contains the information about the job
      * @return true if a cycle would be introduced, false otherwise
      */
     private boolean introducesDeadlock(final long resSlot, final long jobSlot,
-            final DeadlockTracker tracker) {
+                                       final DeadlockTracker tracker) {
+        return introducesDeadlock(resSlot, jobSlot, tracker, 0);
+    }
+
+    private boolean introducesDeadlock(final long resSlot, final long jobSlot,
+                                       final DeadlockTracker tracker, final int depth) {
         synchronized (jobArenaMgr) {
             tracker.pushResource(resSlot);
             long reqSlot = resArenaMgr.getLastHolder(resSlot);
@@ -264,14 +270,19 @@
                 tracker.pushRequest(reqSlot);
                 final long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
                 tracker.pushJob(holderJobSlot);
-                if (holderJobSlot == jobSlot) {
+                if (holderJobSlot == jobSlot && depth != 0) {
                     return true;
                 }
+
                 boolean scanWaiters = true;
                 long waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
+                if (waiter < 0 && scanWaiters) {
+                    scanWaiters = false;
+                    waiter = jobArenaMgr.getLastUpgrader(holderJobSlot);
+                }
                 while (waiter >= 0) {
-                    long watingOnResSlot = reqArenaMgr.getResourceId(waiter);
-                    if (introducesDeadlock(watingOnResSlot, jobSlot, tracker)) {
+                    long waitingOnResSlot = reqArenaMgr.getResourceId(waiter);
+                    if (introducesDeadlock(waitingOnResSlot, jobSlot, tracker, depth + 1)) {
                         return true;
                     }
                     waiter = reqArenaMgr.getNextJobRequest(waiter);
@@ -280,6 +291,7 @@
                         waiter = jobArenaMgr.getLastUpgrader(holderJobSlot);
                     }
                 }
+
                 tracker.pop(); // job
                 tracker.pop(); // request
                 reqSlot = reqArenaMgr.getNextRequest(reqSlot);
@@ -288,14 +300,14 @@
             return false;
         }
     }
-    
+
     @Override
     public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
         log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         stats.instantLock();
-        
-        final int dsId = datasetId.getId();        
+
+        final int dsId = datasetId.getId();
         final int jobId = txnContext.getJobId().getId();
 
         if (entityHashValue != -1) {
@@ -360,12 +372,12 @@
             throws ACIDException {
         log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         stats.tryLock();
-        
+
         final int dsId = datasetId.getId();
         final int jobId = txnContext.getJobId().getId();
 
         if (entityHashValue != -1) {
-            if (! tryLock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext)) {
+            if (!tryLock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext)) {
                 return false;
             }
         } else {
@@ -375,7 +387,7 @@
         }
 
         final long jobSlot = findOrAllocJobSlot(jobId);
-        
+
         final ResourceGroup group = table.get(dsId, entityHashValue);
         group.getLatch();
 
@@ -412,15 +424,15 @@
 
     @Override
     public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
-            ITransactionContext txnContext) throws ACIDException {
+                                  ITransactionContext txnContext) throws ACIDException {
         log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         stats.instantTryLock();
-        
+
         final int dsId = datasetId.getId();
         final int jobId = txnContext.getJobId().getId();
 
         if (entityHashValue != -1) {
-            if (! tryLock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext)) {
+            if (!tryLock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext)) {
                 return false;
             }
         } else {
@@ -486,9 +498,9 @@
             if (resource < 0) {
                 throw new IllegalStateException("resource (" + dsId + ",  " + entityHashValue + ") not found");
             }
-            
+
             if (CHECK_CONSISTENCY) assertLocksCanBefoundInJobQueue();
-            
+
             long holder = removeLastHolder(resource, jobSlot, lockMode);
 
             // deallocate request
@@ -511,17 +523,15 @@
                 final int oldMaxMode = resArenaMgr.getMaxMode(resource);
                 final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
                 resArenaMgr.setMaxMode(resource, newMaxMode);
-                if (oldMaxMode != newMaxMode) {
-                    // the locking mode didn't change, current waiters won't be
-                    // able to acquire the lock, so we do not need to signal them
-                    group.wakeUp();
-                }
+                group.wakeUp();
             }
         } finally {
             group.releaseLatch();
         }
 
-        // dataset intention locks are cleaned up at the end of the job
+        // dataset intention locks are
+        // a) kept in dsLockCache and
+        // b) cleaned up only in releaseLocks at the end of the job
     }
 
     @Override
@@ -590,9 +600,11 @@
             resArenaMgr.setPkHashVal(resSlot, entityHashValue);
             resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
             group.firstResourceIndex.set(resSlot);
-            if (DEBUG_MODE) LOGGER.finer("new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue + ")");
+            if (DEBUG_MODE)
+                LOGGER.finer("new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue + ")");
         } else {
-            if (DEBUG_MODE) LOGGER.finer("fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue + ")");
+            if (DEBUG_MODE)
+                LOGGER.finer("fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue + ")");
         }
         return resSlot;
     }
@@ -626,13 +638,10 @@
      * a) (wait and) convert the lock once conversion becomes viable or
      * b) acquire the lock if we want to lock the same resource with the same
      * lock mode for the same job.
-     * 
-     * @param resource
-     *            the resource slot that's being locked
-     * @param job
-     *            the job slot of the job locking the resource
-     * @param lockMode
-     *            the lock mode that the resource should be locked with
+     *
+     * @param resource the resource slot that's being locked
+     * @param job      the job slot of the job locking the resource
+     * @param lockMode the lock mode that the resource should be locked with
      * @return
      */
     private LockAction updateActionForSameJob(long resource, long job, byte lockMode) {
@@ -678,6 +687,17 @@
             insertIntoJobQueue(request, lastJobHolder);
             jobArenaMgr.setLastHolder(job, request);
         }
+    }
+
+    private boolean hasOtherHolders(long resSlot, long jobSlot) {
+        long holder = resArenaMgr.getLastHolder(resSlot);
+        while (holder != -1) {
+            if (reqArenaMgr.getJobSlot(holder) != jobSlot) {
+                return true;
+            }
+            holder = reqArenaMgr.getNextRequest(holder);
+        }
+        return false;
     }
 
     private long removeLastHolder(long resource, long jobSlot, byte lockMode) {
@@ -830,13 +850,10 @@
      * remove the first request for a given job and lock mode from a request queue.
      * If the value of the parameter lockMode is LockMode.ANY the first request
      * for the job is removed - independent of the LockMode.
-     * 
-     * @param head
-     *            the head of the request queue
-     * @param jobSlot
-     *            the job slot
-     * @param lockMode
-     *            the lock mode
+     *
+     * @param head     the head of the request queue
+     * @param jobSlot  the job slot
+     * @param lockMode the lock mode
      * @return the slot of the first request that matched the given job
      */
     private long removeRequestFromQueueForJob(long head, long jobSlot, byte lockMode) {
@@ -904,9 +921,9 @@
     /*
      * Debugging support
      */
-    
+
     private void log(String string, int id, int entityHashValue, byte lockMode, ITransactionContext txnContext) {
-        if (! LOGGER.isLoggable(LVL)) {
+        if (!LOGGER.isLoggable(LVL)) {
             return;
         }
         StringBuilder sb = new StringBuilder();
@@ -929,7 +946,7 @@
     }
 
     private void assertLocksCanBefoundInJobQueue() throws ACIDException {
-        for (int i = 0; i < ResourceGroupTable.TABLE_SIZE; ++i) {
+        for (int i = 0; i < table.size; ++i) {
             final ResourceGroup group = table.get(i);
             if (group.tryLatch(100, TimeUnit.MILLISECONDS)) {
                 try {
@@ -955,23 +972,24 @@
             }
         }
     }
-    
+
     private void assertLockCanBeFoundInJobQueue(int dsId, int entityHashValue, byte lockMode, int jobId) {
         if (findLockInJobQueue(dsId, entityHashValue, jobId, lockMode) == -1) {
             String msg = "request for " + LockMode.toString(lockMode) + " lock on dataset " + dsId + " entity "
                     + entityHashValue + " not found for job " + jobId + " in thread " + Thread.currentThread().getName();
-            LOGGER.severe(msg);            
+            LOGGER.severe(msg);
             throw new IllegalStateException(msg);
         }
     }
 
     /**
      * tries to find a lock request searching though the job queue
-     * @param dsId dataset id
+     *
+     * @param dsId            dataset id
      * @param entityHashValue primary key hash value
-     * @param jobId job id
-     * @param lockMode lock mode
-     * @return the slot of the request, if the lock request is found, -1 otherwise 
+     * @param jobId           job id
+     * @param lockMode        lock mode
+     * @return the slot of the request, if the lock request is found, -1 otherwise
      */
     private long findLockInJobQueue(final int dsId, final int entityHashValue, final int jobId, byte lockMode) {
         Long jobSlot = jobIdSlotMap.get(jobId);
@@ -989,7 +1007,7 @@
                     && entityHashValue == resArenaMgr.getPkHashVal(resource)
                     && jobSlot == reqArenaMgr.getJobSlot(holder)
                     && (lockMode == reqArenaMgr.getLockMode(holder)
-                        || lockMode == LockMode.ANY)) {
+                    || lockMode == LockMode.ANY)) {
                 return holder;
             }
             synchronized (jobArenaMgr) {
@@ -999,66 +1017,30 @@
         return -1;
     }
 
-    private String resQueueToString(long resSlot) {
-        return appendResQueue(new StringBuilder(), resSlot).toString();
+    private TablePrinter getResourceTablePrinter() {
+        return new ResourceTablePrinter(table, resArenaMgr, reqArenaMgr, jobArenaMgr);
     }
-    
-    private StringBuilder appendResQueue(StringBuilder sb, long resSlot) {
-        resArenaMgr.appendRecord(sb, resSlot);
-        sb.append("\n");
-        appendReqQueue(sb, resArenaMgr.getLastHolder(resSlot));
-        return sb;
+
+    private TablePrinter getDumpTablePrinter() {
+        return new DumpTablePrinter(table, resArenaMgr, reqArenaMgr, jobArenaMgr, jobIdSlotMap);
     }
-    
-    private StringBuilder appendReqQueue(StringBuilder sb, long head) {
-        while (head != -1) {
-            reqArenaMgr.appendRecord(sb, head);
-            sb.append("\n");
-            head = reqArenaMgr.getNextRequest(head);
-        }
-        return sb;
-    }
-    
-    public StringBuilder append(StringBuilder sb) {
-        table.getAllLatches();
-        try {
-            sb.append(">>dump_begin\t>>----- [resTable] -----\n");
-            table.append(sb);
-            sb.append(">>dump_end\t>>----- [resTable] -----\n");
 
-            sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n");
-            resArenaMgr.append(sb);
-            sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n");
-
-            sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n");
-            reqArenaMgr.append(sb);
-            sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n");
-
-            sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n");
-            for (Integer i : jobIdSlotMap.keySet()) {
-                sb.append(i).append(" : ");
-                TypeUtil.Global.append(sb, jobIdSlotMap.get(i));
-                sb.append("\n");
-            }
-            sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n");
-
-            sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n");
-            jobArenaMgr.append(sb);
-            sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n");
-        } finally {
-            table.releaseAllLatches();
-        }
-        return sb;
+    public String printByResource() {
+        return getResourceTablePrinter().append(new StringBuilder()).append("\n").toString();
     }
 
     public String toString() {
-        return append(new StringBuilder()).toString();
+        return printByResource();
+    }
+
+    public String dump() {
+        return getDumpTablePrinter().append(new StringBuilder()).toString();
     }
 
     @Override
     public String prettyPrint() throws ACIDException {
         StringBuilder s = new StringBuilder("\n########### LockManager Status #############\n");
-        return append(s).toString() + "\n";
+        return getDumpTablePrinter().append(s).toString() + "\n";
     }
 
     @Override
@@ -1068,7 +1050,7 @@
 
     @Override
     public void dumpState(OutputStream os) throws IOException {
-        os.write(toString().getBytes());
+        os.write(dump().getBytes());
     }
 
     @Override
@@ -1118,118 +1100,120 @@
         }
     }
 
-    private static class ResourceGroupTable {
-        public static final int TABLE_SIZE = 1024; // TODO increase?
+}
 
-        private ResourceGroup[] table;
+class ResourceGroupTable {
+    public final int size;
 
-        public ResourceGroupTable() {
-            table = new ResourceGroup[TABLE_SIZE];
-            for (int i = 0; i < TABLE_SIZE; ++i) {
-                table[i] = new ResourceGroup();
-            }
-        }
+    private ResourceGroup[] table;
 
-        ResourceGroup get(int dId, int entityHashValue) {
-            // TODO ensure good properties of hash function
-            int h = Math.abs(dId ^ entityHashValue);
-            if (h < 0) h = 0;
-            return table[h % TABLE_SIZE];
-        }
-        
-        ResourceGroup get(int i) {
-            return table[i];
-        }
-
-        public void getAllLatches() {
-            for (int i = 0; i < TABLE_SIZE; ++i) {
-                table[i].getLatch();
-            }
-        }
-
-        public void releaseAllLatches() {
-            for (int i = 0; i < TABLE_SIZE; ++i) {
-                table[i].releaseLatch();
-            }
-        }
-
-        public StringBuilder append(StringBuilder sb) {
-            return append(sb, false);
-        }
-
-        public StringBuilder append(StringBuilder sb, boolean detail) {
-            for (int i = 0; i < table.length; ++i) {
-                sb.append(i).append(" : ");
-                if (detail) {
-                    sb.append(table[i]);
-                } else {
-                    sb.append(table[i].firstResourceIndex);
-                }
-                sb.append('\n');
-            }
-            return sb;
+    public ResourceGroupTable(int size) {
+        this.size = size;
+        table = new ResourceGroup[size];
+        for (int i = 0; i < size; ++i) {
+            table[i] = new ResourceGroup();
         }
     }
 
-    private static class ResourceGroup {
-        private ReentrantReadWriteLock latch;
-        private Condition condition;
-        AtomicLong firstResourceIndex;
+    ResourceGroup get(int dId, int entityHashValue) {
+        // TODO ensure good properties of hash function
+        int h = Math.abs(dId ^ entityHashValue);
+        if (h < 0) h = 0;
+        return table[h % size];
+    }
 
-        ResourceGroup() {
-            latch = new ReentrantReadWriteLock();
-            condition = latch.writeLock().newCondition();
-            firstResourceIndex = new AtomicLong(-1);
-        }
+    ResourceGroup get(int i) {
+        return table[i];
+    }
 
-        void getLatch() {
-            log("latch");
-            latch.writeLock().lock();
+    public void getAllLatches() {
+        for (int i = 0; i < size; ++i) {
+            table[i].getLatch();
         }
-        
-        boolean tryLatch(long timeout, TimeUnit unit) throws ACIDException {
-            log("tryLatch");
-            try {
-                return latch.writeLock().tryLock(timeout, unit);
-            } catch (InterruptedException e) {
-                LOGGER.finer("interrupted while wating on ResourceGroup");
-                throw new ACIDException("interrupted", e);
+    }
+
+    public void releaseAllLatches() {
+        for (int i = 0; i < size; ++i) {
+            table[i].releaseLatch();
+        }
+    }
+
+    public StringBuilder append(StringBuilder sb) {
+        return append(sb, false);
+    }
+
+    public StringBuilder append(StringBuilder sb, boolean detail) {
+        for (int i = 0; i < table.length; ++i) {
+            sb.append(i).append(" : ");
+            if (detail) {
+                sb.append(table[i]);
+            } else {
+                sb.append(table[i].firstResourceIndex);
             }
+            sb.append('\n');
         }
+        return sb;
+    }
+}
 
-        void releaseLatch() {
-            log("release");
-            latch.writeLock().unlock();
-        }
+class ResourceGroup {
+    private ReentrantReadWriteLock latch;
+    private Condition condition;
+    AtomicLong firstResourceIndex;
 
-        boolean hasWaiters() {
-            return latch.hasQueuedThreads();
-        }
+    ResourceGroup() {
+        latch = new ReentrantReadWriteLock();
+        condition = latch.writeLock().newCondition();
+        firstResourceIndex = new AtomicLong(-1);
+    }
 
-        void await(ITransactionContext txnContext) throws ACIDException {
-            log("wait for");
-            try {
-                condition.await();
-            } catch (InterruptedException e) {
-                LOGGER.finer("interrupted while wating on ResourceGroup");
-                throw new ACIDException(txnContext, "interrupted", e);
-            }
-        }
+    void getLatch() {
+        log("latch");
+        latch.writeLock().lock();
+    }
 
-        void wakeUp() {
-            log("notify");
-            condition.signalAll();
+    boolean tryLatch(long timeout, TimeUnit unit) throws ACIDException {
+        log("tryLatch");
+        try {
+            return latch.writeLock().tryLock(timeout, unit);
+        } catch (InterruptedException e) {
+            ConcurrentLockManager.LOGGER.finer("interrupted while wating on ResourceGroup");
+            throw new ACIDException("interrupted", e);
         }
+    }
 
-        void log(String s) {
-            if (LOGGER.isLoggable(LVL)) {
-                LOGGER.log(LVL, s + " " + toString());
-            }            
-        }
+    void releaseLatch() {
+        log("release");
+        latch.writeLock().unlock();
+    }
 
-        public String toString() {
-            return "{ id : " + hashCode() + ", first : " + TypeUtil.Global.toString(firstResourceIndex.get()) + ", waiters : "
-                    + (hasWaiters() ? "true" : "false") + " }";
+    boolean hasWaiters() {
+        return latch.hasQueuedThreads();
+    }
+
+    void await(ITransactionContext txnContext) throws ACIDException {
+        log("wait for");
+        try {
+            condition.await();
+        } catch (InterruptedException e) {
+            ConcurrentLockManager.LOGGER.finer("interrupted while waiting on ResourceGroup");
+            throw new ACIDException(txnContext, "interrupted", e);
         }
     }
+
+    void wakeUp() {
+        log("notify");
+        condition.signalAll();
+    }
+
+    void log(String s) {
+        if (ConcurrentLockManager.LOGGER.isLoggable(ConcurrentLockManager.LVL)) {
+            ConcurrentLockManager.LOGGER.log(ConcurrentLockManager.LVL, s + " " + toString());
+        }
+    }
+
+    public String toString() {
+        return "{ id : " + hashCode() + ", first : " + TypeUtil.Global.toString(firstResourceIndex.get()) + ", waiters : "
+                + (hasWaiters() ? "true" : "false") + " }";
+    }
 }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java
new file mode 100644
index 0000000..ffdb151
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.service.locking;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DumpTablePrinter implements TablePrinter {
+    private ResourceGroupTable table;
+    private ResourceArenaManager resArenaMgr;
+    private RequestArenaManager reqArenaMgr;
+    private JobArenaManager jobArenaMgr;
+    private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
+
+    DumpTablePrinter(ResourceGroupTable table,
+                     ResourceArenaManager resArenaMgr,
+                     RequestArenaManager reqArenaMgr,
+                     JobArenaManager jobArenaMgr,
+                     ConcurrentHashMap<Integer, Long> jobIdSlotMap) {
+        this.table = table;
+        this.resArenaMgr = resArenaMgr;
+        this.reqArenaMgr = reqArenaMgr;
+        this.jobArenaMgr = jobArenaMgr;
+        this.jobIdSlotMap = jobIdSlotMap;
+    }
+
+    public StringBuilder append(StringBuilder sb) {
+        table.getAllLatches();
+        try {
+            sb.append(">>dump_begin\t>>----- [resTable] -----\n");
+            table.append(sb);
+            sb.append(">>dump_end\t>>----- [resTable] -----\n");
+
+            sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n");
+            resArenaMgr.append(sb);
+            sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n");
+
+            sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n");
+            reqArenaMgr.append(sb);
+            sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n");
+
+            sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n");
+            for (Integer i : jobIdSlotMap.keySet()) {
+                sb.append(i).append(" : ");
+                TypeUtil.Global.append(sb, jobIdSlotMap.get(i));
+                sb.append("\n");
+            }
+            sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n");
+
+            sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n");
+            jobArenaMgr.append(sb);
+            sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n");
+        } finally {
+            table.releaseAllLatches();
+        }
+        return sb;
+    }
+
+    String resQueueToString(long resSlot) {
+        return appendResQueue(new StringBuilder(), resSlot).toString();
+    }
+
+    StringBuilder appendResQueue(StringBuilder sb, long resSlot) {
+        resArenaMgr.appendRecord(sb, resSlot);
+        sb.append("\n");
+        appendReqQueue(sb, resArenaMgr.getLastHolder(resSlot));
+        return sb;
+    }
+
+    StringBuilder appendReqQueue(StringBuilder sb, long head) {
+        while (head != -1) {
+            reqArenaMgr.appendRecord(sb, head);
+            sb.append("\n");
+            head = reqArenaMgr.getNextRequest(head);
+        }
+        return sb;
+    }
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index aaa96bb..1dbf16b 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -392,7 +392,6 @@
 class LockRequestWorker implements Runnable {
 
     String threadName;
-    TransactionSubsystem txnProvider;
     ILockManager lockMgr;
     WorkerReadyQueue workerReadyQueue;
     LockRequest lockRequest;
@@ -401,7 +400,6 @@
     boolean isDone;
 
     public LockRequestWorker(TransactionSubsystem txnProvider, WorkerReadyQueue workerReadyQueue, String threadName) {
-        this.txnProvider = txnProvider;
         this.lockMgr = txnProvider.getLockManager();
         this.workerReadyQueue = workerReadyQueue;
         this.threadName = new String(threadName);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java
new file mode 100644
index 0000000..ff93fa5
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
+
+import static org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+
+public class ResourceTablePrinter implements TablePrinter {
+    private ResourceGroupTable table;
+    private ResourceArenaManager resArenaMgr;
+    private RequestArenaManager reqArenaMgr;
+    private JobArenaManager jobArenaMgr;
+
+    ResourceTablePrinter(ResourceGroupTable table,
+                         ResourceArenaManager resArenaMgr,
+                         RequestArenaManager reqArenaMgr,
+                         JobArenaManager jobArenaMgr) {
+        this.table = table;
+        this.resArenaMgr = resArenaMgr;
+        this.reqArenaMgr = reqArenaMgr;
+        this.jobArenaMgr = jobArenaMgr;
+    }
+
+    public StringBuilder append(StringBuilder sb) {
+        table.getAllLatches();
+        sb.append("[\n");
+        int i = 0;
+        long res = -1;
+        while (res == -1 && i < table.size) {
+            res = table.get(i++).firstResourceIndex.get();
+        }
+        while (i < table.size) {
+            sb = appendResource(sb, res);
+            res = resArenaMgr.getNext(res);
+            while (res == -1 && i < table.size) {
+                res = table.get(i++).firstResourceIndex.get();
+            }
+            if (res == -1) {
+                sb.append("\n");
+                break;
+            } else {
+                sb.append(",\n");
+            }
+        }
+        table.releaseAllLatches();
+        return sb.append("]");
+    }
+
+    StringBuilder appendResource(StringBuilder sb, long res) {
+        sb.append("{ \"dataset\": ").append(resArenaMgr.getDatasetId(res));
+        sb.append(", \"hash\": ").append(resArenaMgr.getPkHashVal(res));
+        sb.append(", \"max mode\": ").append(string(resArenaMgr.getMaxMode(res)));
+        long lastHolder = resArenaMgr.getLastHolder(res);
+        if (lastHolder != -1) {
+            sb = appendRequests(sb.append(", \"holders\": "), lastHolder);
+        }
+        long firstUpgrader = resArenaMgr.getFirstUpgrader(res);
+        if (firstUpgrader != -1) {
+            sb = appendRequests(sb.append(", \"upgraders\": "), firstUpgrader);
+        }
+        long firstWaiter = resArenaMgr.getFirstWaiter(res);
+        if (firstWaiter != -1) {
+            sb = appendRequests(sb.append(", \"waiters\": "), firstWaiter);
+        }
+        return sb.append(" }");
+    }
+
+    StringBuilder appendRequests(StringBuilder sb, long req) {
+        sb.append("[ ");
+        while (req != -1) {
+            appendRequest(sb, req);
+            req = reqArenaMgr.getNextRequest(req);
+            sb.append(req == -1 ? " ]" : ", ");
+        }
+        return sb;
+    }
+
+    StringBuilder appendRequest(StringBuilder sb, long req) {
+        long job = reqArenaMgr.getJobSlot(req);
+        sb.append("{ \"job\": ").append(jobArenaMgr.getJobId(job));
+        sb.append(", \"mode\": \"").append(string(reqArenaMgr.getLockMode(req)));
+        return sb.append("\" }");
+    }
+
+    private static final String string(int lockMode) {
+        return LockMode.toString((byte) lockMode);
+    }
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java
new file mode 100644
index 0000000..2b4260b
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.service.locking;
+
+public interface TablePrinter {
+    StringBuilder append(StringBuilder sb);
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index 09fbb06..3f1f56a 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -51,7 +51,7 @@
         this.id = id;
         this.txnProperties = txnProperties;
         this.transactionManager = new TransactionManager(this);
-        this.lockManager = new ConcurrentLockManager(this);
+        this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
         this.logManager = new LogManager(this);
         this.recoveryManager = new RecoveryManager(this);
         if (asterixAppRuntimeContextProvider != null) {
@@ -92,4 +92,4 @@
         return id;
     }
 
-}
\ No newline at end of file
+}
diff --git a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
new file mode 100644
index 0000000..43b1a30
--- /dev/null
+++ b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
@@ -0,0 +1,332 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.asterix.transaction.management.service.locking;
+
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.JobId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Logger;
+
+import static org.apache.asterix.transaction.management.service.locking.Request.Kind;
+import static org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class LockManagerUnitTest {
+
+    public static int LOCK_MGR_SHRINK_TIMER = 5000;
+    public static int LOCK_MGR_ARENAS = 2;
+    public static int LOCK_MGR_TABLE_SIZE = 10;
+
+    static int INITIAL_TIMESTAMP = 0;
+    static long PER_REQUEST_TIMEOUT_MS = 2000;
+
+    static {
+        //Logger.getLogger(ConcurrentLockManager.class.getName()).addHandler(new ConsoleHandler());
+    }
+
+    Map<Integer, ITransactionContext> jobMap;
+    ILockManager lockMgr;
+
+    // set to e.g. System.err to get some output
+    PrintStream out = System.out;
+    PrintStream err = null;//System.err;
+
+    //--------------------------------------------------------------------
+    // JUnit methods
+    //--------------------------------------------------------------------
+
+    @Before
+    public void setUp() throws Exception {
+        jobMap = new HashMap<>();
+        lockMgr = new ConcurrentLockManager(LOCK_MGR_SHRINK_TIMER, LOCK_MGR_ARENAS, LOCK_MGR_TABLE_SIZE);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        lockMgr = null;
+        jobMap = null;
+    }
+
+    @Test
+    public void testSimpleSharedUnlock() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.UNLOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.UNLOCK, j(1), d(1), e(-1), LockMode.IS));
+        reportErrors(execute(reqs));
+    }
+
+    @Test
+    public void testSimpleSharedRelease() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.RELEASE, j(1)));
+        reportErrors(execute(reqs));
+    }
+
+    @Test
+    public void testReacquire() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.RELEASE, j(1)));
+        reportErrors(execute(reqs));
+    }
+
+    @Test
+    public void testDeadlock() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.LOCK, j(2), d(1), e(2), LockMode.X));
+        reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(2), LockMode.X));
+        reqs.add(req(Kind.RELEASE, j(1)));
+        reqs.add(req(Kind.RELEASE, j(2)));
+        expectError(execute(reqs), j(1), ACIDException.class);
+    }
+
+    @Test
+    public void testUpgrade() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.RELEASE, j(1)));
+        reportErrors(execute(reqs));
+    }
+
+    @Test
+    public void testUpgrade2() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.PRINT));
+        reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.RELEASE, j(1)));
+        reqs.add(req(Kind.RELEASE, j(2)));
+        expectError(execute(reqs), j(2), ACIDException.class);
+    }
+
+    //--------------------------------------------------------------------
+    // Helper methods
+    //--------------------------------------------------------------------
+
+    /**
+     * execute a list of requests where
+     * a) each job runs in a different thread and
+     * b) the threads/jobs are synchronized
+     * (i.e. the requests are executed in list order)
+     *
+     * @param reqs a list of requests that will be execute in order
+     * @return a map of (JodId, exception) pairs that can either be handled
+     * by the test or thrown using #reportErrors
+     */
+    private Map<String, Throwable> execute(List<Request> reqs) throws InterruptedException {
+        if (err != null) {
+            err.println("*** start ***");
+        }
+        long timeout = PER_REQUEST_TIMEOUT_MS * reqs.size();
+        final AtomicInteger timeStamp = new AtomicInteger(INITIAL_TIMESTAMP);
+        Set<Locker> lockers = createLockers(reqs, timeStamp);
+        long start = System.currentTimeMillis();
+        Map<String, Thread> threads = startThreads(lockers);
+
+        int coordinatorTime = timeStamp.get();
+        while (active(lockers)) {
+            if (coordinatorTime == timeStamp.get()) {
+                Thread.sleep(10);
+                if (coordinatorTime == timeStamp.get()) {
+                    Locker timedOut = timedOut(lockers);
+                    threads.get(timedOut.name).interrupt();
+                }
+            }
+        }
+        Map<String, Throwable> result = stopThreads(lockers, threads);
+        return result;
+    }
+
+    private boolean active(Set<Locker> lockers) {
+        for (Locker locker : lockers) {
+            if (!locker.active()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private Locker timedOut(Set<Locker> lockers) {
+        for (Locker locker : lockers) {
+            if (locker.timedOut()) {
+                return locker;
+            }
+        }
+        return null;
+    }
+
+    private Set<Locker> createLockers(List<Request> reqs, AtomicInteger timeStamp) {
+        Set<Locker> lockers = new HashSet<>();
+        for (ITransactionContext txnCtx : jobMap.values()) {
+            Locker locker = new Locker(lockMgr, txnCtx, reqs, timeStamp, out, err);
+            lockers.add(locker);
+        }
+        return lockers;
+    }
+
+    private Map<String, Thread> startThreads(Set<Locker> lockers) {
+        Map<String, Thread> threads = new HashMap<>(lockers.size());
+        for (Locker locker : lockers) {
+            Thread t = new Thread(locker, locker.name);
+            threads.put(locker.name, t);
+            t.start();
+        }
+        return threads;
+    }
+
+    private Map<String, Throwable> stopThreads(Set<Locker> lockers, Map<String, Thread> threads) throws InterruptedException {
+        Map<String, Throwable> result = new HashMap<>();
+        for (Locker locker : lockers) {
+            stopThread(threads.get(locker.name));
+            Throwable error = locker.getError();
+            if (error != null) {
+                result.put(locker.name, error);
+            }
+        }
+        return result;
+    }
+
+    private void stopThread(Thread t) throws InterruptedException {
+        if (err != null) {
+            err.println("stopping " + t.getName() + " " + t.getState());
+        }
+        boolean done = false;
+        while (!done) {
+            switch (t.getState()) {
+                case NEW:
+                case RUNNABLE:
+                case TERMINATED:
+                    done = true;
+                    break;
+                default:
+                    if (err != null) {
+                        err.println("interrupting " + t.getName());
+                    }
+                    t.interrupt();
+            }
+        }
+        if (err != null) {
+            err.println("joining " + t.getName());
+        }
+        t.join();
+    }
+
+    /**
+     * throws the first Throwable found in the map.
+     * This is the default way to handle the errors returned by #execute
+     *
+     * @param errors a map of (JodId, exception) pairs
+     */
+    void reportErrors(Map<String, Throwable> errors) {
+        for (String name : errors.keySet()) {
+            throw new AssertionError("job " + name + " caught something", errors.get(name));
+        }
+        out.println("no errors");
+    }
+
+    /**
+     * gets the error for a specific job from the errors map
+     *
+     * @param errors a map of (JodId, throwable) pairs
+     * @param txnCtx the transaction context of the job whose error is requested
+     * @return throwable for said error
+     */
+    private static Throwable getError(Map<String, Throwable> errors, ITransactionContext txnCtx) {
+        return errors.get(txnCtx.getJobId().toString());
+    }
+
+    /**
+     * asserts that the error for a specific job from the errors map is of a specific class
+     *
+     * @param errors a map of (JodId, throwable) pairs
+     * @param txnCtx the transaction context of the job whose error is requested
+     * @param clazz  the exception class
+     */
+    private void expectError(Map<String, Throwable> errors, ITransactionContext txnCtx, Class<? extends Throwable> clazz) throws Exception {
+        Throwable error = getError(errors, txnCtx);
+        if (error == null) {
+            throw new NullPointerException();
+        }
+        if (!clazz.isInstance(error)) {
+            throw new AssertionError(error);
+        }
+        out.println("caught expected " + error);
+    }
+
+    //--------------------------------------------------------------------
+    // Convenience methods to make test description more compact
+    //--------------------------------------------------------------------
+
+    private Request req(final Kind kind, final ITransactionContext txnCtx,
+                        final DatasetId dsId, final int hashValue, final byte lockMode) {
+        return Request.create(kind, txnCtx, dsId, hashValue, lockMode);
+    }
+
+    private Request req(final Kind kind, final ITransactionContext txnCtx) {
+        return Request.create(kind, txnCtx);
+    }
+
+    private Request req(final Kind kind) {
+        return Request.create(kind, out);
+    }
+
+    private static DatasetId d(int id) {
+        return new DatasetId(id);
+    }
+
+    private static int e(int i) {
+        return i;
+    }
+
+    private ITransactionContext j(int jId) {
+        if (!jobMap.containsKey(jId)) {
+            ITransactionContext mockTxnContext = mock(ITransactionContext.class);
+            when(mockTxnContext.getJobId()).thenReturn(new JobId(jId));
+            jobMap.put(jId, mockTxnContext);
+        }
+        return jobMap.get(jId);
+    }
+
+}
diff --git a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java
new file mode 100644
index 0000000..d5c3404
--- /dev/null
+++ b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java
@@ -0,0 +1,149 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.junit.Assert;
+
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class Locker implements Runnable {
+
+    private static int TIMEOUT_MS = 1000;
+
+    public String name;
+
+    ILockManager lockMgr;
+    List<Requester> requests;
+    Iterator<Requester> curRequest;
+    AtomicInteger globalTime;
+    int localTime;
+    private Throwable error;
+    private int reqStart;
+
+    PrintStream out;
+    PrintStream err;
+
+    Locker(ILockManager lockMgr, ITransactionContext txnCtx, List<Request> allRequests, AtomicInteger time, PrintStream out, PrintStream err) {
+        this.name = txnCtx.getJobId().toString();
+        this.lockMgr = lockMgr;
+
+        this.requests = new LinkedList<>();
+        for (int pos = 0; pos < allRequests.size(); ++pos) {
+            Request req = allRequests.get(pos);
+            if (req.txnCtx == txnCtx || req.txnCtx == null) {
+                this.requests.add(new Requester(pos, req));
+            }
+        }
+        this.curRequest = requests.iterator();
+
+        this.globalTime = time;
+        this.localTime = -1;
+
+        this.out = out;
+        this.err = err;
+    }
+
+    synchronized Throwable getError() {
+        return error;
+    }
+
+    private synchronized void setError(Throwable error) {
+        log("caught " + error);
+        this.error = error;
+    }
+
+    public synchronized boolean active() {
+        return curRequest.hasNext() && error == null;
+    }
+
+    public synchronized boolean timedOut() {
+        return reqStart > 0 && ((int) System.currentTimeMillis() - reqStart) > TIMEOUT_MS;
+    }
+
+    @Override
+    public void run() {
+        log("running");
+        try {
+            while (error == null && curRequest.hasNext()) {
+                Requester req = curRequest.next();
+                localTime = globalTime.get();
+                while (localTime < req.time) {
+                    Thread.sleep(10);
+                    localTime = globalTime.get();
+                }
+                log("will exec at t=" + localTime + " " + req);
+                try {
+                    reqStart = (int) System.currentTimeMillis();
+                    Assert.assertEquals(localTime, globalTime.getAndIncrement());
+                    req.setResult(req.request.execute(lockMgr) ? Requester.SUCCESS : Requester.FAIL);
+                } catch (ACIDException e) {
+                    req.setResult(Requester.ERROR);
+                    setError(e);
+                } finally {
+                    reqStart = -1;
+                }
+                log("time " + localTime);
+            }
+        } catch (Throwable e) {
+            if (err != null) {
+                e.printStackTrace(err);
+            }
+            Assert.fail(e.toString());
+        }
+        log("done");
+    }
+
+    private void log(String msg) {
+        if (err != null) {
+            err.println(Thread.currentThread().getName() + " " + msg);
+        }
+    }
+}
+
+class Requester {
+
+    public static byte NONE = -1;
+    public static byte FAIL = 0;
+    public static byte SUCCESS = 1;
+    public static byte ERROR = 2;
+
+    int time;
+    Request request;
+    byte result = NONE;
+
+    Requester(int time, Request request) {
+        this.time = time;
+        this.request = request;
+    }
+
+    void setResult(byte res) {
+        result = res;
+    }
+
+    public String toString() {
+        return request.toString() + " t=" + time;
+    }
+}
diff --git a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
new file mode 100644
index 0000000..c7e0c42
--- /dev/null
+++ b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
@@ -0,0 +1,166 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
+
+import java.io.PrintStream;
+
+/**
+ * repesents a lock request for testing.
+ */
+abstract class Request {
+    /** the kind of a request */
+    enum Kind {
+        /** requests an instant-try-lock */
+        INSTANT_TRY_LOCK,
+        /** requests an instant-lock */
+        INSTANT_LOCK,
+        /** requests a lock */
+        LOCK,
+        /** prints a JSON representation of the lock table by entity */
+        PRINT,
+        /** releases all locks */
+        RELEASE,
+        /** requests a try-lock */
+        TRY_LOCK,
+        /** unlocks a lock */
+        UNLOCK
+    }
+
+    Kind kind;
+    ITransactionContext txnCtx;
+
+    Request(Kind kind, ITransactionContext txnCtx) {
+        this.kind = kind;
+        this.txnCtx = txnCtx;
+    }
+
+    String asString(final Kind kind, final ITransactionContext txnCtx,
+                    final DatasetId dsId, final int hashValue, final byte lockMode) {
+        return txnCtx.getJobId().toString() + ":" + kind.name() + ":" + dsId.getId() + ":" + hashValue + ":"
+                + TransactionManagementConstants.LockManagerConstants.LockMode.toString(lockMode);
+    }
+
+    abstract boolean execute(ILockManager lockMgr) throws ACIDException;
+
+    static Request create(final Kind kind, final ITransactionContext txnCtx,
+                          final DatasetId dsId, final int hashValue, final byte lockMode) {
+        switch (kind) {
+            case INSTANT_TRY_LOCK:
+                return new Request(kind, txnCtx) {
+                    boolean execute(ILockManager lockMgr) throws ACIDException {
+                        return lockMgr.instantTryLock(dsId, hashValue, lockMode, txnCtx);
+                    }
+
+                    public String toString() {
+                        return asString(kind, txnCtx, dsId, hashValue, lockMode);
+                    }
+                };
+            case INSTANT_LOCK:
+                return new Request(kind, txnCtx) {
+                    boolean execute(ILockManager lockMgr) throws ACIDException {
+                        lockMgr.instantLock(dsId, hashValue, lockMode, txnCtx);
+                        return true;
+                    }
+
+                    public String toString() {
+                        return asString(kind, txnCtx, dsId, hashValue, lockMode);
+                    }
+                };
+            case LOCK:
+                return new Request(kind, txnCtx) {
+                    boolean execute(ILockManager lockMgr) throws ACIDException {
+                        lockMgr.lock(dsId, hashValue, lockMode, txnCtx);
+                        return true;
+                    }
+
+                    public String toString() {
+                        return asString(kind, txnCtx, dsId, hashValue, lockMode);
+                    }
+                };
+            case TRY_LOCK:
+                return new Request(kind, txnCtx) {
+                    boolean execute(ILockManager lockMgr) throws ACIDException {
+                        return lockMgr.tryLock(dsId, hashValue, lockMode, txnCtx);
+                    }
+
+                    public String toString() {
+                        return asString(kind, txnCtx, dsId, hashValue, lockMode);
+                    }
+                };
+            case UNLOCK:
+                return new Request(kind, txnCtx) {
+                    boolean execute(ILockManager lockMgr) throws ACIDException {
+                        lockMgr.unlock(dsId, hashValue, lockMode, txnCtx);
+                        return true;
+                    }
+
+                    public String toString() {
+                        return asString(kind, txnCtx, dsId, hashValue, lockMode);
+                    }
+                };
+            default:
+        }
+        throw new AssertionError("Illegal Request Kind " + kind);
+    }
+
+    static Request create(final Kind kind, final ITransactionContext txnCtx) {
+        if (kind == Kind.RELEASE) {
+            return new Request(kind, txnCtx) {
+                boolean execute(ILockManager lockMgr) throws ACIDException {
+                    lockMgr.releaseLocks(txnCtx);
+                    return true;
+                }
+
+                public String toString() {
+                    return txnCtx.getJobId().toString() + ":" + kind.name();
+                }
+            };
+        }
+        throw new AssertionError("Illegal Request Kind " + kind);
+    }
+
+    static Request create(final Kind kind, final PrintStream out) {
+        if (kind == Kind.PRINT) {
+            return new Request(kind, null) {
+                boolean execute(ILockManager lockMgr) throws ACIDException {
+                    if (out == null) {
+                        return false;
+                    }
+                    if (!(lockMgr instanceof ConcurrentLockManager)) {
+                        out.print("cannot print");
+                        return false;
+                    }
+                    out.print(((ConcurrentLockManager) lockMgr).printByResource());
+                    return true;
+                }
+
+                public String toString() {
+                    return kind.name();
+                }
+            };
+        }
+        throw new AssertionError("Illegal Request Kind " + kind);
+    }
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/474
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If49ed8d48fa8c71a52c880d4f42a2badbe6a57d7
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <tillw@apache.org>

Mime
View raw message