Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 960E8200C05 for ; Mon, 23 Jan 2017 18:52:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 94A6A160B3C; Mon, 23 Jan 2017 17:52:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4F6D6160B53 for ; Mon, 23 Jan 2017 18:52:51 +0100 (CET) Received: (qmail 84467 invoked by uid 500); 23 Jan 2017 17:52:45 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 84442 invoked by uid 99); 23 Jan 2017 17:52:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Jan 2017 17:52:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2A6BEDF9FC; Mon, 23 Jan 2017 17:52:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Date: Mon, 23 Jan 2017 17:52:46 -0000 Message-Id: <73fe0faa2cc74bd8a06bcbf393fad78d@git.apache.org> In-Reply-To: <5eb385adb9b34eb784dcd6a49abd199a@git.apache.org> References: <5eb385adb9b34eb784dcd6a49abd199a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hbase git commit: HBASE-17067 Procedure v2 - remove zklock/tryLock and use wait/wake (Matteo Bertozzi) archived-at: Mon, 23 Jan 2017 17:52:53 -0000 HBASE-17067 Procedure v2 - remove zklock/tryLock and use wait/wake (Matteo Bertozzi) This is an amalgam of https://reviews.apache.org/r/54435/ and https://github.com/matteobertozzi/hbase/commit/9c14863594a8ff67e406d1e0efe0a874f71b858c Removes notion of suspend/resume from procedure. Instead have the below lock states and just unschedule if lock is not yet available LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to execute. LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should take care of readding the procedure back to the runnable set for retry LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take care of readding the procedure back to the runnable set when the lock is available. Side benefit is being able to undo a bunch of synchronization around procedure management. Signed-off-by: Michael Stack Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/980c8c20 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/980c8c20 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/980c8c20 Branch: refs/heads/master Commit: 980c8c204775e789fae057a6383ff4b725067b83 Parents: ba4a926 Author: Michael Stack Authored: Thu Jan 19 14:11:53 2017 -0800 Committer: Michael Stack Committed: Mon Jan 23 09:29:16 2017 -0800 ---------------------------------------------------------------------- .../procedure2/AbstractProcedureScheduler.java | 2 - .../hadoop/hbase/procedure2/Procedure.java | 54 ++-- .../hbase/procedure2/ProcedureExecutor.java | 77 +++-- .../procedure2/store/wal/WALProcedureStore.java | 1 - .../procedure2/ProcedureTestingUtility.java | 52 ++-- .../procedure2/TestProcedureSuspended.java | 5 +- .../org/apache/hadoop/hbase/master/HMaster.java | 2 +- .../hbase/master/locking/LockProcedure.java | 28 +- .../AbstractStateMachineNamespaceProcedure.java | 13 +- .../AbstractStateMachineTableProcedure.java | 13 +- .../procedure/CreateNamespaceProcedure.java | 9 +- .../master/procedure/CreateTableProcedure.java | 9 +- .../master/procedure/MasterProcedureEnv.java | 5 - .../procedure/MasterProcedureScheduler.java | 279 ++++++++++--------- .../procedure/MergeTableRegionsProcedure.java | 12 +- .../master/procedure/ServerCrashProcedure.java | 13 +- .../procedure/SplitTableRegionProcedure.java | 7 +- .../hbase/master/locking/TestLockManager.java | 1 - ...ProcedureSchedulerPerformanceEvaluation.java | 40 ++- .../MasterProcedureTestingUtility.java | 13 +- .../procedure/TestMasterProcedureEvents.java | 11 +- .../procedure/TestMasterProcedureScheduler.java | 124 ++++----- ...TestMasterProcedureSchedulerConcurrency.java | 76 +---- 23 files changed, 425 insertions(+), 421 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index dc94983..ff8d978 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -289,12 +289,10 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { } protected void suspendProcedure(final ProcedureEventQueue event, final Procedure procedure) { - procedure.suspend(); event.suspendProcedure(procedure); } protected void wakeProcedure(final Procedure procedure) { - procedure.resume(); push(procedure, /* addFront= */ true, /* notify= */false); } http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 3f3cf33..fee5250 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -59,7 +59,13 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceStability.Evolving public abstract class Procedure implements Comparable { public static final long NO_PROC_ID = -1; - public static final int NO_TIMEOUT = -1; + protected static final int NO_TIMEOUT = -1; + + public enum LockState { + LOCK_ACQUIRED, // lock acquired and ready to execute + LOCK_YIELD_WAIT, // lock not acquired, framework needs to yield + LOCK_EVENT_WAIT, // lock not acquired, an event will yield the procedure + } // unchanged after initialization private NonceKey nonceKey = null; @@ -80,9 +86,6 @@ public abstract class Procedure implements Comparable { private volatile byte[] result = null; - // TODO: it will be nice having pointers to allow the scheduler doing suspend/resume tricks - private boolean suspended = false; - /** * The main code of the procedure. It must be idempotent since execute() * may be called multiple time in case of machine failure in the middle @@ -142,14 +145,23 @@ public abstract class Procedure implements Comparable { /** * The user should override this method, and try to take a lock if necessary. * A lock can be anything, and it is up to the implementor. - * Example: in our Master we can execute request in parallel for different tables - * create t1 and create t2 can be executed at the same time. - * anything else on t1/t2 is queued waiting that specific table create to happen. * - * @return true if the lock was acquired and false otherwise + *

Example: in our Master we can execute request in parallel for different tables. + * We can create t1 and create t2 and this can be executed at the same time. + * Anything else on t1/t2 is queued waiting that specific table create to happen. + * + *

There are 3 LockState: + *

  • LOCK_ACQUIRED should be returned when the proc has the lock and the proc is + * ready to execute.
  • + *
  • LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework + * should take care of readding the procedure back to the runnable set for retry
  • + *
  • LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will + * take care of readding the procedure back to the runnable set when the lock is available. + *
+ * @return the lock state as described above. */ - protected boolean acquireLock(final TEnvironment env) { - return true; + protected LockState acquireLock(final TEnvironment env) { + return LockState.LOCK_ACQUIRED; } /** @@ -301,9 +313,6 @@ public abstract class Procedure implements Comparable { */ protected void toStringState(StringBuilder builder) { builder.append(getState()); - if (isSuspended()) { - builder.append("|SUSPENDED"); - } } /** @@ -495,23 +504,6 @@ public abstract class Procedure implements Comparable { // ============================================================================================== /** - * @return true if the procedure is in a suspended state, - * waiting for the resources required to execute the procedure will become available. - */ - public synchronized boolean isSuspended() { - return suspended; - } - - public synchronized void suspend() { - suspended = true; - } - - public synchronized void resume() { - assert isSuspended() : this + " expected suspended state, got " + state; - suspended = false; - } - - /** * @return true if the procedure is in a RUNNABLE state. */ protected synchronized boolean isRunnable() { @@ -737,7 +729,7 @@ public abstract class Procedure implements Comparable { * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock(). */ @InterfaceAudience.Private - protected boolean doAcquireLock(final TEnvironment env) { + protected LockState doAcquireLock(final TEnvironment env) { return acquireLock(env); } http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 0912cb7..c5f6daf 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; +import org.apache.hadoop.hbase.procedure2.Procedure.LockState; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; @@ -255,6 +256,7 @@ public class ProcedureExecutor { private final AtomicBoolean running = new AtomicBoolean(false); private final TEnvironment environment; private final ProcedureStore store; + private final boolean checkOwnerSet; public ProcedureExecutor(final Configuration conf, final TEnvironment environment, @@ -1090,17 +1092,34 @@ public class ProcedureExecutor { if (!procStack.acquire(proc)) { if (procStack.setRollback()) { // we have the 'rollback-lock' we can start rollingback - if (!executeRollback(rootProcId, procStack)) { - procStack.unsetRollback(); - scheduler.yield(proc); + switch (executeRollback(rootProcId, procStack)) { + case LOCK_ACQUIRED: + break; + case LOCK_YIELD_WAIT: + scheduler.yield(proc); + procStack.unsetRollback(); + break; + case LOCK_EVENT_WAIT: + procStack.unsetRollback(); + break; + default: + throw new UnsupportedOperationException(); } } else { // if we can't rollback means that some child is still running. // the rollback will be executed after all the children are done. // If the procedure was never executed, remove and mark it as rolledback. if (!proc.wasExecuted()) { - if (!executeRollback(proc)) { - scheduler.yield(proc); + switch (executeRollback(proc)) { + case LOCK_ACQUIRED: + break; + case LOCK_YIELD_WAIT: + scheduler.yield(proc); + break; + case LOCK_EVENT_WAIT: + break; + default: + throw new UnsupportedOperationException(); } } } @@ -1109,11 +1128,19 @@ public class ProcedureExecutor { // Execute the procedure assert proc.getState() == ProcedureState.RUNNABLE : proc; - if (acquireLock(proc)) { - execProcedure(procStack, proc); - releaseLock(proc, false); - } else { - scheduler.yield(proc); + switch (acquireLock(proc)) { + case LOCK_ACQUIRED: + execProcedure(procStack, proc); + releaseLock(proc, false); + break; + case LOCK_YIELD_WAIT: + scheduler.yield(proc); + break; + case LOCK_EVENT_WAIT: + // someone will wake us up when the lock is available + break; + default: + throw new UnsupportedOperationException(); } procStack.release(proc); @@ -1139,13 +1166,13 @@ public class ProcedureExecutor { } while (procStack.isFailed()); } - private boolean acquireLock(final Procedure proc) { + private LockState acquireLock(final Procedure proc) { final TEnvironment env = getEnvironment(); // hasLock() is used in conjunction with holdLock(). // This allows us to not rewrite or carry around the hasLock() flag // for every procedure. the hasLock() have meaning only if holdLock() is true. if (proc.holdLock(env) && proc.hasLock(env)) { - return true; + return LockState.LOCK_ACQUIRED; } return proc.doAcquireLock(env); } @@ -1164,7 +1191,7 @@ public class ProcedureExecutor { * Once the procedure is rolledback, the root-procedure will be visible as * finished to user, and the result will be the fatal exception. */ - private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) { + private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) { final Procedure rootProc = procedures.get(rootProcId); RemoteProcedureException exception = rootProc.getException(); if (exception == null) { @@ -1181,13 +1208,15 @@ public class ProcedureExecutor { while (stackTail --> 0) { final Procedure proc = subprocStack.get(stackTail); - if (!reuseLock && !acquireLock(proc)) { + LockState lockState; + if (!reuseLock && (lockState = acquireLock(proc)) != LockState.LOCK_ACQUIRED) { // can't take a lock on the procedure, add the root-proc back on the // queue waiting for the lock availability - return false; + return lockState; } - boolean abortRollback = !executeRollback(proc); + lockState = executeRollback(proc); + boolean abortRollback = lockState != LockState.LOCK_ACQUIRED; abortRollback |= !isRunning() || !store.isRunning(); // If the next procedure is the same to this one @@ -1201,14 +1230,14 @@ public class ProcedureExecutor { // allows to kill the executor before something is stored to the wal. // useful to test the procedure recovery. if (abortRollback) { - return false; + return lockState; } subprocStack.remove(stackTail); // if the procedure is kind enough to pass the slot to someone else, yield if (proc.isYieldAfterExecutionStep(getEnvironment())) { - return false; + return LockState.LOCK_YIELD_WAIT; } if (proc != rootProc) { @@ -1221,7 +1250,7 @@ public class ProcedureExecutor { " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) + " exception=" + exception.getMessage()); procedureFinished(rootProc); - return true; + return LockState.LOCK_ACQUIRED; } /** @@ -1229,17 +1258,17 @@ public class ProcedureExecutor { * It updates the store with the new state (stack index) * or will remove completly the procedure in case it is a child. */ - private boolean executeRollback(final Procedure proc) { + private LockState executeRollback(final Procedure proc) { try { proc.doRollback(getEnvironment()); } catch (IOException e) { if (LOG.isDebugEnabled()) { LOG.debug("Roll back attempt failed for " + proc, e); } - return false; + return LockState.LOCK_YIELD_WAIT; } catch (InterruptedException e) { handleInterruptedException(proc, e); - return false; + return LockState.LOCK_YIELD_WAIT; } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e); @@ -1250,7 +1279,7 @@ public class ProcedureExecutor { if (testing != null && testing.shouldKillBeforeStoreUpdate()) { LOG.debug("TESTING: Kill before store update"); stop(); - return false; + return LockState.LOCK_YIELD_WAIT; } if (proc.removeStackIndex()) { @@ -1270,7 +1299,7 @@ public class ProcedureExecutor { store.update(proc); } - return true; + return LockState.LOCK_ACQUIRED; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index d4d5773..5042554 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -27,7 +27,6 @@ import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; -import java.util.List; import java.util.Set; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TimeUnit; http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 8aa2088..2a659f8 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.Callable; import java.util.ArrayList; import java.util.Set; @@ -67,27 +68,46 @@ public class ProcedureTestingUtility { }); } - public static void restart(ProcedureExecutor procExecutor) - throws Exception { - restart(procExecutor, null, true); + public static void restart(final ProcedureExecutor procExecutor) throws Exception { + restart(procExecutor, false, true, null, null); } - public static void restart(ProcedureExecutor procExecutor, - Runnable beforeStartAction, boolean failOnCorrupted) throws Exception { - ProcedureStore procStore = procExecutor.getStore(); - int storeThreads = procExecutor.getCorePoolSize(); - int execThreads = procExecutor.getCorePoolSize(); + public static void restart(final ProcedureExecutor procExecutor, + final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted, + final Callable stopAction, final Callable startAction) + throws Exception { + final ProcedureStore procStore = procExecutor.getStore(); + final int storeThreads = procExecutor.getCorePoolSize(); + final int execThreads = procExecutor.getCorePoolSize(); + + final ProcedureExecutor.Testing testing = procExecutor.testing; + if (avoidTestKillDuringRestart) { + procExecutor.testing = null; + } + // stop + LOG.info("RESTART - Stop"); procExecutor.stop(); - procExecutor.join(); procStore.stop(false); - // nothing running... - if (beforeStartAction != null) { - beforeStartAction.run(); + if (stopAction != null) { + stopAction.call(); } + procExecutor.join(); + procExecutor.getScheduler().clear(); + + // nothing running... + // re-start + LOG.info("RESTART - Start"); procStore.start(storeThreads); procExecutor.start(execThreads, failOnCorrupted); + if (startAction != null) { + startAction.call(); + } + + if (avoidTestKillDuringRestart) { + procExecutor.testing = testing; + } } public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader) @@ -309,11 +329,11 @@ public class ProcedureTestingUtility { public static void testRecoveryAndDoubleExecution(final ProcedureExecutor procExec, final long procId, final boolean expectFailure, final Runnable customRestart) throws Exception { - final Procedure proc = procExec.getProcedure(procId); + Procedure proc = procExec.getProcedure(procId); waitProcedure(procExec, procId); assertEquals(false, procExec.isRunning()); - for (int i = 0; !procExec.isFinished(procId); ++i) { + proc = procExec.getProcedure(procId); LOG.info("Restart " + i + " exec state: " + proc); if (customRestart != null) { customRestart.run(); @@ -415,8 +435,8 @@ public class ProcedureTestingUtility { // Mark acquire/release lock functions public for test uses. @Override - public boolean acquireLock(Void env) { - return true; + public LockState acquireLock(Void env) { + return LockState.LOCK_ACQUIRED; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java index 0a8b0e4..ba89768 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java @@ -212,11 +212,12 @@ public class TestProcedureSuspended { } @Override - protected boolean acquireLock(final TestProcEnv env) { + protected LockState acquireLock(final TestProcEnv env) { if ((hasLock = lock.compareAndSet(false, true))) { LOG.info("ACQUIRE LOCK " + this + " " + (hasLock)); + return LockState.LOCK_ACQUIRED; } - return hasLock; + return LockState.LOCK_YIELD_WAIT; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index fbe8ec6..04c9b43 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1087,7 +1087,7 @@ public class HMaster extends HRegionServer implements MasterServices { new MasterProcedureEnv.WALStoreLeaseRecovery(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore, - procEnv.getProcedureQueue()); + procEnv.getProcedureScheduler()); configurationManager.registerObserver(procEnv); final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java index 1a1c8c3..20fc492 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java @@ -289,7 +289,7 @@ public final class LockProcedure extends Procedure } @Override - protected boolean acquireLock(final MasterProcedureEnv env) { + protected LockState acquireLock(final MasterProcedureEnv env) { boolean ret = lock.acquireLock(env); locked.set(ret); hasLock = ret; @@ -298,8 +298,10 @@ public final class LockProcedure extends Procedure LOG.debug("LOCKED - " + toString()); } lastHeartBeat.set(System.currentTimeMillis()); + return LockState.LOCK_ACQUIRED; } - return ret; + LOG.warn("Failed acquire LOCK " + toString() + "; YIELDING"); + return LockState.LOCK_EVENT_WAIT; } @Override @@ -414,37 +416,43 @@ public final class LockProcedure extends Procedure private class TableExclusiveLock implements LockInterface { @Override public boolean acquireLock(final MasterProcedureEnv env) { - return env.getProcedureScheduler().tryAcquireTableExclusiveLock(LockProcedure.this, tableName); + // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT + // to get the lock and false if you don't; i.e. you got the lock. + return !env.getProcedureScheduler().waitTableExclusiveLock(LockProcedure.this, tableName); } @Override public void releaseLock(final MasterProcedureEnv env) { - env.getProcedureScheduler().releaseTableExclusiveLock(LockProcedure.this, tableName); + env.getProcedureScheduler().wakeTableExclusiveLock(LockProcedure.this, tableName); } } private class TableSharedLock implements LockInterface { @Override public boolean acquireLock(final MasterProcedureEnv env) { - return env.getProcedureScheduler().tryAcquireTableSharedLock(LockProcedure.this, tableName); + // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT + // to get the lock and false if you don't; i.e. you got the lock. + return !env.getProcedureScheduler().waitTableSharedLock(LockProcedure.this, tableName); } @Override public void releaseLock(final MasterProcedureEnv env) { - env.getProcedureScheduler().releaseTableSharedLock(LockProcedure.this, tableName); + env.getProcedureScheduler().wakeTableSharedLock(LockProcedure.this, tableName); } } private class NamespaceExclusiveLock implements LockInterface { @Override public boolean acquireLock(final MasterProcedureEnv env) { - return env.getProcedureScheduler().tryAcquireNamespaceExclusiveLock( + // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT + // to get the lock and false if you don't; i.e. you got the lock. + return !env.getProcedureScheduler().waitNamespaceExclusiveLock( LockProcedure.this, namespace); } @Override public void releaseLock(final MasterProcedureEnv env) { - env.getProcedureScheduler().releaseNamespaceExclusiveLock( + env.getProcedureScheduler().wakeNamespaceExclusiveLock( LockProcedure.this, namespace); } } @@ -452,6 +460,8 @@ public final class LockProcedure extends Procedure private class RegionExclusiveLock implements LockInterface { @Override public boolean acquireLock(final MasterProcedureEnv env) { + // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT + // to get the lock and false if you don't; i.e. you got the lock. return !env.getProcedureScheduler().waitRegions(LockProcedure.this, tableName, regionInfos); } @@ -460,4 +470,4 @@ public final class LockProcedure extends Procedure env.getProcedureScheduler().wakeRegions(LockProcedure.this, tableName, regionInfos); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java index a514532..03fdaef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java @@ -58,13 +58,16 @@ public abstract class AbstractStateMachineNamespaceProcedure } @Override - protected boolean acquireLock(final MasterProcedureEnv env) { - if (env.waitInitialized(this)) return false; - return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(this, getNamespaceName()); + protected LockState acquireLock(final MasterProcedureEnv env) { + if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT; + if (env.getProcedureScheduler().waitNamespaceExclusiveLock(this, getNamespaceName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; } @Override protected void releaseLock(final MasterProcedureEnv env) { - env.getProcedureQueue().releaseNamespaceExclusiveLock(this, getNamespaceName()); + env.getProcedureScheduler().wakeNamespaceExclusiveLock(this, getNamespaceName()); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java index 7cced45..e957f9d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java @@ -75,14 +75,17 @@ public abstract class AbstractStateMachineTableProcedure } @Override - protected boolean acquireLock(final MasterProcedureEnv env) { - if (env.waitInitialized(this)) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName()); + protected LockState acquireLock(final MasterProcedureEnv env) { + if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT; + if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; } @Override protected void releaseLock(final MasterProcedureEnv env) { - env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName()); + env.getProcedureScheduler().wakeTableExclusiveLock(this, getTableName()); } protected User getUser() { @@ -108,4 +111,4 @@ public abstract class AbstractStateMachineTableProcedure throw new TableNotFoundException(getTableName()); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java index 982e880..2c39c09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java @@ -160,16 +160,19 @@ public class CreateNamespaceProcedure } @Override - protected boolean acquireLock(final MasterProcedureEnv env) { + protected LockState acquireLock(final MasterProcedureEnv env) { if (!env.getMasterServices().isInitialized()) { // Namespace manager might not be ready if master is not fully initialized, // return false to reject user namespace creation; return true for default // and system namespace creation (this is part of master initialization). if (!isBootstrapNamespace() && env.waitInitialized(this)) { - return false; + return LockState.LOCK_EVENT_WAIT; } } - return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(this, getNamespaceName()); + if (env.getProcedureScheduler().waitNamespaceExclusiveLock(this, getNamespaceName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java index 0d24f51..2421dfc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java @@ -216,11 +216,14 @@ public class CreateTableProcedure } @Override - protected boolean acquireLock(final MasterProcedureEnv env) { + protected LockState acquireLock(final MasterProcedureEnv env) { if (!getTableName().isSystemTable() && env.waitInitialized(this)) { - return false; + return LockState.LOCK_EVENT_WAIT; + } + if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; } - return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName()); + return LockState.LOCK_ACQUIRED; } private boolean prepareCreate(final MasterProcedureEnv env) throws IOException { http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 353342a..87c79b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -121,11 +121,6 @@ public class MasterProcedureEnv implements ConfigurationObserver { return master.getMasterCoprocessorHost(); } - @Deprecated - public MasterProcedureScheduler getProcedureQueue() { - return procSched; - } - public MasterProcedureScheduler getProcedureScheduler() { return procSched; } http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index b9b7b59..bd1b3e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -112,8 +112,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private > void doAdd(final FairQueue fairq, final Queue queue, final Procedure proc, final boolean addFront) { - if (proc.isSuspended()) return; - queue.add(proc, addFront); if (!queue.hasExclusiveLock() || queue.isLockOwner(proc.getProcId())) { // if the queue was not remove for an xlock execution @@ -157,6 +155,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final boolean xlockReq = rq.requireExclusiveLock(pollResult); if (xlockReq && rq.isLocked() && !rq.hasLockAccess(pollResult)) { // someone is already holding the lock (e.g. shared lock). avoid a yield + removeFromRunQueue(fairq, rq); return null; } @@ -177,7 +176,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } @Override - public void clearQueue() { + protected void clearQueue() { // Remove Servers for (int i = 0; i < serverBuckets.length; ++i) { clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); @@ -460,7 +459,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } @Override - public synchronized boolean isAvailable() { + public boolean isAvailable() { // if there are no items in the queue, or the namespace is locked. // we can't execute operation on this table if (isEmpty() || namespaceQueue.hasExclusiveLock()) { @@ -478,7 +477,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return true; } - public synchronized RegionEvent getRegionEvent(final HRegionInfo regionInfo) { + public RegionEvent getRegionEvent(final HRegionInfo regionInfo) { if (regionEventMap == null) { regionEventMap = new HashMap(); } @@ -490,7 +489,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return event; } - public synchronized void removeRegionEvent(final RegionEvent event) { + public void removeRegionEvent(final RegionEvent event) { regionEventMap.remove(event.getRegionInfo().getEncodedName()); if (regionEventMap.isEmpty()) { regionEventMap = null; @@ -511,30 +510,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } public boolean requireExclusiveLock(Procedure proc) { - TableProcedureInterface tpi = (TableProcedureInterface)proc; - switch (tpi.getTableOperationType()) { - case CREATE: - case DELETE: - case DISABLE: - case ENABLE: - return true; - case EDIT: - // we allow concurrent edit on the NS table - return !tpi.getTableName().equals(TableName.NAMESPACE_TABLE_NAME); - case READ: - return false; - // region operations are using the shared-lock on the table - // and then they will grab an xlock on the region. - case SPLIT: - case MERGE: - case ASSIGN: - case UNASSIGN: - case REGION_EDIT: - return false; - default: - break; - } - throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType()); + return requireTableExclusiveLock((TableProcedureInterface)proc); } } @@ -589,96 +565,139 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // ============================================================================ // Table Locking Helpers // ============================================================================ + private static boolean requireTableExclusiveLock(TableProcedureInterface proc) { + switch (proc.getTableOperationType()) { + case CREATE: + case DELETE: + case DISABLE: + case ENABLE: + return true; + case EDIT: + // we allow concurrent edit on the NS table + return !proc.getTableName().equals(TableName.NAMESPACE_TABLE_NAME); + case READ: + return false; + // region operations are using the shared-lock on the table + // and then they will grab an xlock on the region. + case SPLIT: + case MERGE: + case ASSIGN: + case UNASSIGN: + case REGION_EDIT: + return false; + default: + break; + } + throw new UnsupportedOperationException("unexpected type " + + proc.getTableOperationType()); + } + /** - * Try to acquire the exclusive lock on the specified table. - * other operations in the table-queue will be executed after the lock is released. + * Suspend the procedure if the specified table is already locked. + * Other operations in the table-queue will be executed after the lock is released. * @param procedure the procedure trying to acquire the lock * @param table Table to lock - * @return true if we were able to acquire the lock on the table, otherwise false. + * @return true if the procedure has to wait for the table to be available */ - public boolean tryAcquireTableExclusiveLock(final Procedure procedure, final TableName table) { + public boolean waitTableExclusiveLock(final Procedure procedure, final TableName table) { schedLock(); try { - final TableQueue queue = getTableQueue(table); - if (!queue.getNamespaceQueue().trySharedLock()) { - return false; + final TableQueue tableQueue = getTableQueue(table); + final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue(); + if (!nsQueue.trySharedLock()) { + suspendProcedure(nsQueue.getEvent(), procedure); + return true; } - - if (!queue.tryExclusiveLock(procedure)) { - queue.getNamespaceQueue().releaseSharedLock(); - return false; + if (!tableQueue.tryExclusiveLock(procedure)) { + nsQueue.releaseSharedLock(); + suspendProcedure(tableQueue.getEvent(), procedure); + return true; } - - removeFromRunQueue(tableRunQueue, queue); - return true; + removeFromRunQueue(tableRunQueue, tableQueue); + return false; } finally { schedUnlock(); } } /** - * Release the exclusive lock taken with tryAcquireTableWrite() + * Wake the procedures waiting for the specified table * @param procedure the procedure releasing the lock * @param table the name of the table that has the exclusive lock */ - public void releaseTableExclusiveLock(final Procedure procedure, final TableName table) { + public void wakeTableExclusiveLock(final Procedure procedure, final TableName table) { schedLock(); try { - final TableQueue queue = getTableQueue(table); - if (!queue.hasParentLock(procedure)) { - queue.releaseExclusiveLock(procedure); + final TableQueue tableQueue = getTableQueue(table); + int waitingCount = 0; + + if (!tableQueue.hasParentLock(procedure)) { + tableQueue.releaseExclusiveLock(procedure); + waitingCount += popEventWaitingProcedures(tableQueue.getEvent()); } - queue.getNamespaceQueue().releaseSharedLock(); - addToRunQueue(tableRunQueue, queue); + final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue(); + if (nsQueue.releaseSharedLock()) { + waitingCount += popEventWaitingProcedures(nsQueue.getEvent()); + } + addToRunQueue(tableRunQueue, tableQueue); + wakePollIfNeeded(waitingCount); } finally { schedUnlock(); } } /** - * Try to acquire the shared lock on the specified table. + * Suspend the procedure if the specified table is already locked. * other "read" operations in the table-queue may be executed concurrently, * @param procedure the procedure trying to acquire the lock * @param table Table to lock - * @return true if we were able to acquire the lock on the table, otherwise false. + * @return true if the procedure has to wait for the table to be available */ - public boolean tryAcquireTableSharedLock(final Procedure procedure, final TableName table) { - return tryAcquireTableQueueSharedLock(procedure, table) != null; + public boolean waitTableSharedLock(final Procedure procedure, final TableName table) { + return waitTableQueueSharedLock(procedure, table) == null; } - private TableQueue tryAcquireTableQueueSharedLock(final Procedure procedure, - final TableName table) { + private TableQueue waitTableQueueSharedLock(final Procedure procedure, final TableName table) { schedLock(); try { - final TableQueue queue = getTableQueue(table); - if (!queue.getNamespaceQueue().trySharedLock()) { + final TableQueue tableQueue = getTableQueue(table); + final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue(); + if (!nsQueue.trySharedLock()) { + suspendProcedure(nsQueue.getEvent(), procedure); return null; } - if (!queue.trySharedLock()) { - queue.getNamespaceQueue().releaseSharedLock(); + if (!tableQueue.trySharedLock()) { + tableQueue.getNamespaceQueue().releaseSharedLock(); + suspendProcedure(tableQueue.getEvent(), procedure); return null; } - return queue; + return tableQueue; } finally { schedUnlock(); } } /** - * Release the shared lock taken with tryAcquireTableRead() + * Wake the procedures waiting for the specified table * @param procedure the procedure releasing the lock * @param table the name of the table that has the shared lock */ - public void releaseTableSharedLock(final Procedure procedure, final TableName table) { + public void wakeTableSharedLock(final Procedure procedure, final TableName table) { schedLock(); try { - final TableQueue queue = getTableQueue(table); - if (queue.releaseSharedLock()) { - addToRunQueue(tableRunQueue, queue); + final TableQueue tableQueue = getTableQueue(table); + final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue(); + int waitingCount = 0; + if (tableQueue.releaseSharedLock()) { + addToRunQueue(tableRunQueue, tableQueue); + waitingCount += popEventWaitingProcedures(tableQueue.getEvent()); + } + if (nsQueue.releaseSharedLock()) { + waitingCount += popEventWaitingProcedures(nsQueue.getEvent()); } - queue.getNamespaceQueue().releaseSharedLock(); + wakePollIfNeeded(waitingCount); } finally { schedUnlock(); } @@ -746,7 +765,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { queue = getTableQueueWithLock(table); } else { // acquire the table shared-lock - queue = tryAcquireTableQueueSharedLock(procedure, table); + queue = waitTableQueueSharedLock(procedure, table); if (queue == null) return true; } @@ -771,7 +790,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } if (!hasLock && !procedure.hasParent()) { - releaseTableSharedLock(procedure, table); + wakeTableSharedLock(procedure, table); } return !hasLock; } @@ -822,13 +841,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { for (int i = numProcs - 1; i >= 0; --i) { wakeProcedure(nextProcs[i]); } - wakePollIfNeeded(numProcs); - if (!procedure.hasParent()) { // release the table shared-lock. // (if we have a parent, it is holding an xlock so we didn't take the shared-lock) - releaseTableSharedLock(procedure, table); + wakeTableSharedLock(procedure, table); } } finally { schedUnlock(); @@ -839,45 +856,52 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // Namespace Locking Helpers // ============================================================================ /** - * Try to acquire the exclusive lock on the specified namespace. - * @see #releaseNamespaceExclusiveLock(Procedure,String) + * Suspend the procedure if the specified namespace is already locked. + * @see #wakeNamespaceExclusiveLock(Procedure,String) * @param procedure the procedure trying to acquire the lock * @param nsName Namespace to lock - * @return true if we were able to acquire the lock on the namespace, otherwise false. + * @return true if the procedure has to wait for the namespace to be available */ - public boolean tryAcquireNamespaceExclusiveLock(final Procedure procedure, final String nsName) { + public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String nsName) { schedLock(); try { - TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); - if (!tableQueue.trySharedLock()) return false; + final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); + if (!tableQueue.trySharedLock()) { + suspendProcedure(tableQueue.getEvent(), procedure); + return true; + } - NamespaceQueue nsQueue = getNamespaceQueue(nsName); - boolean hasLock = nsQueue.tryExclusiveLock(procedure); - if (!hasLock) { + final NamespaceQueue nsQueue = getNamespaceQueue(nsName); + if (!nsQueue.tryExclusiveLock(procedure)) { tableQueue.releaseSharedLock(); + suspendProcedure(nsQueue.getEvent(), procedure); + return true; } - return hasLock; + return false; } finally { schedUnlock(); } } /** - * Release the exclusive lock - * @see #tryAcquireNamespaceExclusiveLock(Procedure,String) + * Wake the procedures waiting for the specified namespace + * @see #waitNamespaceExclusiveLock(Procedure,String) * @param procedure the procedure releasing the lock * @param nsName the namespace that has the exclusive lock */ - public void releaseNamespaceExclusiveLock(final Procedure procedure, final String nsName) { + public void wakeNamespaceExclusiveLock(final Procedure procedure, final String nsName) { schedLock(); try { final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); - final NamespaceQueue queue = getNamespaceQueue(nsName); - - queue.releaseExclusiveLock(procedure); + final NamespaceQueue nsQueue = getNamespaceQueue(nsName); + int waitingCount = 0; + nsQueue.releaseExclusiveLock(procedure); if (tableQueue.releaseSharedLock()) { addToRunQueue(tableRunQueue, tableQueue); + waitingCount += popEventWaitingProcedures(tableQueue.getEvent()); } + waitingCount += popEventWaitingProcedures(nsQueue.getEvent()); + wakePollIfNeeded(waitingCount); } finally { schedUnlock(); } @@ -888,67 +912,45 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // ============================================================================ /** * Try to acquire the exclusive lock on the specified server. - * @see #releaseServerExclusiveLock(Procedure,ServerName) + * @see #wakeServerExclusiveLock(Procedure,ServerName) * @param procedure the procedure trying to acquire the lock * @param serverName Server to lock - * @return true if we were able to acquire the lock on the server, otherwise false. + * @return true if the procedure has to wait for the server to be available */ - public boolean tryAcquireServerExclusiveLock(final Procedure procedure, - final ServerName serverName) { + public boolean waitServerExclusiveLock(final Procedure procedure, final ServerName serverName) { schedLock(); try { ServerQueue queue = getServerQueue(serverName); if (queue.tryExclusiveLock(procedure)) { removeFromRunQueue(serverRunQueue, queue); - return true; + return false; } + suspendProcedure(queue.getEvent(), procedure); + return true; } finally { schedUnlock(); } - return false; } /** - * Release the exclusive lock - * @see #tryAcquireServerExclusiveLock(Procedure,ServerName) + * Wake the procedures waiting for the specified server + * @see #waitServerExclusiveLock(Procedure,ServerName) * @param procedure the procedure releasing the lock * @param serverName the server that has the exclusive lock */ - public void releaseServerExclusiveLock(final Procedure procedure, - final ServerName serverName) { + public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) { schedLock(); try { - ServerQueue queue = getServerQueue(serverName); + final ServerQueue queue = getServerQueue(serverName); queue.releaseExclusiveLock(procedure); addToRunQueue(serverRunQueue, queue); + int waitingCount = popEventWaitingProcedures(queue.getEvent()); + wakePollIfNeeded(waitingCount); } finally { schedUnlock(); } } - /** - * Try to acquire the shared lock on the specified server. - * @see #releaseServerSharedLock(Procedure,ServerName) - * @param procedure the procedure releasing the lock - * @param serverName Server to lock - * @return true if we were able to acquire the lock on the server, otherwise false. - */ - public boolean tryAcquireServerSharedLock(final Procedure procedure, - final ServerName serverName) { - return getServerQueueWithLock(serverName).trySharedLock(); - } - - /** - * Release the shared lock taken - * @see #tryAcquireServerSharedLock(Procedure,ServerName) - * @param procedure the procedure releasing the lock - * @param serverName the server that has the shared lock - */ - public void releaseServerSharedLock(final Procedure procedure, - final ServerName serverName) { - getServerQueueWithLock(serverName).releaseSharedLock(); - } - // ============================================================================ // Generic Helpers // ============================================================================ @@ -965,8 +967,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { boolean isSuspended(); } + // TODO Why OK not having synchronized access and/or volatiles and + // sharedLock-- and sharedLock++? Is this accessed by one thread only? + // Write up the concurrency expectations. St.Ack 01/19/2017 private static abstract class Queue> extends AvlLinkedNode> implements QueueInterface { + private final ProcedureEventQueue event; private boolean suspended = false; private long exclusiveLockProcIdOwner = Long.MIN_VALUE; @@ -982,6 +988,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public Queue(TKey key, int priority) { this.key = key; this.priority = priority; + this.event = new ProcedureEventQueue(); } protected TKey getKey() { @@ -992,6 +999,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return priority; } + public ProcedureEventQueue getEvent() { + return event; + } + /** * True if the queue is not in the run-queue and it is owned by an event. */ @@ -1008,48 +1019,48 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // ====================================================================== // Read/Write Locking helpers // ====================================================================== - public synchronized boolean isLocked() { + public boolean isLocked() { return hasExclusiveLock() || sharedLock > 0; } - public synchronized boolean hasExclusiveLock() { + public boolean hasExclusiveLock() { return this.exclusiveLockProcIdOwner != Long.MIN_VALUE; } - public synchronized boolean trySharedLock() { + public boolean trySharedLock() { if (hasExclusiveLock()) return false; sharedLock++; return true; } - public synchronized boolean releaseSharedLock() { + public boolean releaseSharedLock() { return --sharedLock == 0; } - protected synchronized boolean isSingleSharedLock() { + protected boolean isSingleSharedLock() { return sharedLock == 1; } - public synchronized boolean isLockOwner(long procId) { + public boolean isLockOwner(long procId) { return exclusiveLockProcIdOwner == procId; } - public synchronized boolean hasParentLock(final Procedure proc) { + public boolean hasParentLock(final Procedure proc) { return proc.hasParent() && (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); } - public synchronized boolean hasLockAccess(final Procedure proc) { + public boolean hasLockAccess(final Procedure proc) { return isLockOwner(proc.getProcId()) || hasParentLock(proc); } - public synchronized boolean tryExclusiveLock(final Procedure proc) { + public boolean tryExclusiveLock(final Procedure proc) { if (isLocked()) return hasLockAccess(proc); exclusiveLockProcIdOwner = proc.getProcId(); return true; } - public synchronized boolean releaseExclusiveLock(final Procedure proc) { + public boolean releaseExclusiveLock(final Procedure proc) { if (isLockOwner(proc.getProcId())) { exclusiveLockProcIdOwner = Long.MIN_VALUE; return true; @@ -1059,7 +1070,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // This should go away when we have the new AM and its events // and we move xlock to the lock-event-queue. - public synchronized boolean isAvailable() { + public boolean isAvailable() { return !hasExclusiveLock() && !isEmpty(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java index c313700..d7fe5f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java @@ -322,17 +322,19 @@ public class MergeTableRegionsProcedure } @Override - protected boolean acquireLock(final MasterProcedureEnv env) { + protected LockState acquireLock(final MasterProcedureEnv env) { if (env.waitInitialized(this)) { - return false; + return LockState.LOCK_EVENT_WAIT; } - return !env.getProcedureQueue().waitRegions( - this, getTableName(), regionsToMerge[0], regionsToMerge[1]); + return env.getProcedureScheduler().waitRegions(this, getTableName(), + regionsToMerge[0], regionsToMerge[1])? + LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED; } @Override protected void releaseLock(final MasterProcedureEnv env) { - env.getProcedureQueue().wakeRegions(this, getTableName(), regionsToMerge[0], regionsToMerge[1]); + env.getProcedureScheduler().wakeRegions(this, getTableName(), + regionsToMerge[0], regionsToMerge[1]); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 98a2152..7b4eb6e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -562,14 +562,19 @@ implements ServerProcedureInterface { } @Override - protected boolean acquireLock(final MasterProcedureEnv env) { - if (env.waitServerCrashProcessingEnabled(this)) return false; - return env.getProcedureQueue().tryAcquireServerExclusiveLock(this, getServerName()); + protected LockState acquireLock(final MasterProcedureEnv env) { + // TODO: Put this BACK AFTER AMv2 goes in!!!! + // if (env.waitFailoverCleanup(this)) return LockState.LOCK_EVENT_WAIT; + if (env.waitServerCrashProcessingEnabled(this)) return LockState.LOCK_EVENT_WAIT; + if (env.getProcedureScheduler().waitServerExclusiveLock(this, getServerName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; } @Override protected void releaseLock(final MasterProcedureEnv env) { - env.getProcedureQueue().releaseServerExclusiveLock(this, getServerName()); + env.getProcedureScheduler().wakeServerExclusiveLock(this, getServerName()); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java index 4730ad8..69b89be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java @@ -347,11 +347,12 @@ public class SplitTableRegionProcedure } @Override - protected boolean acquireLock(final MasterProcedureEnv env) { + protected LockState acquireLock(final MasterProcedureEnv env) { if (env.waitInitialized(this)) { - return false; + return LockState.LOCK_EVENT_WAIT; } - return !env.getProcedureScheduler().waitRegions(this, getTableName(), parentHRI); + return env.getProcedureScheduler().waitRegions(this, getTableName(), parentHRI)? + LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/test/java/org/apache/hadoop/hbase/master/locking/TestLockManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/locking/TestLockManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/locking/TestLockManager.java index 1f3241d..fa43fbd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/locking/TestLockManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/locking/TestLockManager.java @@ -59,7 +59,6 @@ public class TestLockManager { private static final Log LOG = LogFactory.getLog(TestLockProcedure.class); protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static final Configuration conf = UTIL.getConfiguration(); private static MasterServices masterServices; private static String namespace = "namespace"; http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java index efa45e7..2b28c9f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java @@ -62,11 +62,11 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase + "proportion of table:region ops is 1:regions_per_table. Default: " + DEFAULT_OPS_TYPE); - private int numTables; - private int regionsPerTable; - private int numOps; - private int numThreads; - private String opsType; + private int numTables = DEFAULT_NUM_TABLES; + private int regionsPerTable = DEFAULT_REGIONS_PER_TABLE; + private int numOps = DEFAULT_NUM_OPERATIONS; + private int numThreads = DEFAULT_NUM_THREADS; + private String opsType = DEFAULT_OPS_TYPE; private MasterProcedureScheduler procedureScheduler; // List of table/region procedures to schedule. @@ -83,10 +83,13 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase super(procId, hri.getTable(), TableOperationType.UNASSIGN, hri); } - public boolean acquireLock(Void env) { - return !procedureScheduler.waitRegions(this, getTableName(), getRegionInfo()); + @Override + public LockState acquireLock(Void env) { + return procedureScheduler.waitRegions(this, getTableName(), getRegionInfo())? + LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED; } + @Override public void releaseLock(Void env) { procedureScheduler.wakeRegions(this, getTableName(), getRegionInfo()); } @@ -110,12 +113,15 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase super(procId, tableName, TableOperationType.EDIT); } - public boolean acquireLock(Void env) { - return procedureScheduler.tryAcquireTableExclusiveLock(this, getTableName()); + @Override + public LockState acquireLock(Void env) { + return procedureScheduler.waitTableExclusiveLock(this, getTableName())? + LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED; } + @Override public void releaseLock(Void env) { - procedureScheduler.releaseTableExclusiveLock(this, getTableName()); + procedureScheduler.wakeTableExclusiveLock(this, getTableName()); } } @@ -212,11 +218,15 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase continue; } - if (proc.acquireLock(null)) { - completed.incrementAndGet(); - proc.releaseLock(null); - } else { - procedureScheduler.yield(proc); + switch (proc.acquireLock(null)) { + case LOCK_ACQUIRED: + completed.incrementAndGet(); + proc.releaseLock(null); + break; + case LOCK_YIELD_WAIT: + break; + case LOCK_EVENT_WAIT: + break; } if (completed.get() % 100000 == 0) { System.out.println("Completed " + completed.get() + " procedures."); http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java index 2bd4f44..7e6691d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hbase.master.procedure; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.List; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,17 +49,12 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.TableStateManager; -import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; -import org.apache.hadoop.hbase.util.ModifyRegionUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.MD5Hash; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; public class MasterProcedureTestingUtility { private static final Log LOG = LogFactory.getLog(MasterProcedureTestingUtility.class); http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java index a88eb62..450714f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java @@ -18,14 +18,15 @@ package org.apache.hadoop.hbase.master.procedure; +import static org.junit.Assert.assertEquals; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; @@ -37,16 +38,12 @@ import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; - import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - @Category({MasterTests.class, MediumTests.class}) public class TestMasterProcedureEvents { private static final Log LOG = LogFactory.getLog(TestCreateTableProcedure.class); @@ -141,7 +138,7 @@ public class TestMasterProcedureEvents { private void testProcedureEventWaitWake(final HMaster master, final ProcedureEvent event, final Procedure proc) throws Exception { final ProcedureExecutor procExec = master.getMasterProcedureExecutor(); - final MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue(); + final MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureScheduler(); final long startPollCalls = procSched.getPollCalls(); final long startNullPollCalls = procSched.getNullPollCalls(); http://git-wip-us.apache.org/repos/asf/hbase/blob/980c8c20/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index dc60710..438736e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hbase.master.procedure; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.Arrays; @@ -25,7 +29,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.procedure2.Procedure; @@ -39,10 +42,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - @Category({MasterTests.class, SmallTests.class}) public class TestMasterProcedureScheduler { private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class); @@ -89,8 +88,8 @@ public class TestMasterProcedureScheduler { Procedure proc = queue.poll(); assertTrue(proc != null); TableName tableName = ((TestTableProcedure)proc).getTableName(); - queue.tryAcquireTableExclusiveLock(proc, tableName); - queue.releaseTableExclusiveLock(proc, tableName); + queue.waitTableExclusiveLock(proc, tableName); + queue.wakeTableExclusiveLock(proc, tableName); queue.completionCleanup(proc); assertEquals(--count, queue.size()); assertEquals(i * 1000 + j, proc.getProcId()); @@ -128,12 +127,12 @@ public class TestMasterProcedureScheduler { Procedure proc = queue.poll(); assertEquals(1, proc.getProcId()); // take the xlock - assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); + assertEquals(false, queue.waitTableExclusiveLock(proc, tableName)); // table can't be deleted because we have the lock assertEquals(0, queue.size()); assertFalse(queue.markTableAsDeleted(tableName, dummyProc)); // release the xlock - queue.releaseTableExclusiveLock(proc, tableName); + queue.wakeTableExclusiveLock(proc, tableName); // complete the table deletion assertTrue(queue.markTableAsDeleted(tableName, proc)); } @@ -164,7 +163,7 @@ public class TestMasterProcedureScheduler { Procedure proc = procs[i] = queue.poll(); assertEquals(i + 1, proc.getProcId()); // take the rlock - assertTrue(queue.tryAcquireTableSharedLock(proc, tableName)); + assertEquals(false, queue.waitTableSharedLock(proc, tableName)); // table can't be deleted because we have locks and/or items in the queue assertFalse(queue.markTableAsDeleted(tableName, dummyProc)); } @@ -173,7 +172,7 @@ public class TestMasterProcedureScheduler { // table can't be deleted because we have locks assertFalse(queue.markTableAsDeleted(tableName, dummyProc)); // release the rlock - queue.releaseTableSharedLock(procs[i], tableName); + queue.wakeTableSharedLock(procs[i], tableName); } // there are no items and no lock in the queeu @@ -202,48 +201,48 @@ public class TestMasterProcedureScheduler { // Fetch the 1st item and take the write lock Procedure proc = queue.poll(); assertEquals(1, proc.getProcId()); - assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName)); + assertEquals(false, queue.waitTableExclusiveLock(proc, tableName)); // Fetch the 2nd item and verify that the lock can't be acquired assertEquals(null, queue.poll(0)); // Release the write lock and acquire the read lock - queue.releaseTableExclusiveLock(proc, tableName); + queue.wakeTableExclusiveLock(proc, tableName); // Fetch the 2nd item and take the read lock Procedure rdProc = queue.poll(); assertEquals(2, rdProc.getProcId()); - assertEquals(true, queue.tryAcquireTableSharedLock(rdProc, tableName)); + assertEquals(false, queue.waitTableSharedLock(rdProc, tableName)); // Fetch the 3rd item and verify that the lock can't be acquired assertEquals(null, queue.poll(0)); // release the rdlock of item 2 and take the wrlock for the 3d item - queue.releaseTableSharedLock(rdProc, tableName); + queue.wakeTableSharedLock(rdProc, tableName); // Fetch the 3rd item and take the write lock Procedure wrProc = queue.poll(); - assertEquals(true, queue.tryAcquireTableExclusiveLock(wrProc, tableName)); + assertEquals(false, queue.waitTableExclusiveLock(wrProc, tableName)); // Fetch 4th item and verify that the lock can't be acquired assertEquals(null, queue.poll(0)); // Release the write lock and acquire the read lock - queue.releaseTableExclusiveLock(wrProc, tableName); + queue.wakeTableExclusiveLock(wrProc, tableName); // Fetch the 4th item and take the read lock rdProc = queue.poll(); assertEquals(4, rdProc.getProcId()); - assertEquals(true, queue.tryAcquireTableSharedLock(rdProc, tableName)); + assertEquals(false, queue.waitTableSharedLock(rdProc, tableName)); // Fetch the 4th item and take the read lock Procedure rdProc2 = queue.poll(); assertEquals(5, rdProc2.getProcId()); - assertEquals(true, queue.tryAcquireTableSharedLock(rdProc2, tableName)); + assertEquals(false, queue.waitTableSharedLock(rdProc2, tableName)); // Release 4th and 5th read-lock - queue.releaseTableSharedLock(rdProc, tableName); - queue.releaseTableSharedLock(rdProc2, tableName); + queue.wakeTableSharedLock(rdProc, tableName); + queue.wakeTableSharedLock(rdProc2, tableName); // remove table queue assertEquals(0, queue.size()); @@ -268,34 +267,36 @@ public class TestMasterProcedureScheduler { // Fetch the 1st item and take the write lock Procedure procNs1 = queue.poll(); assertEquals(1, procNs1.getProcId()); - assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(procNs1, nsName1)); + assertEquals(false, queue.waitNamespaceExclusiveLock(procNs1, nsName1)); // System tables have 2 as default priority Procedure procNs2 = queue.poll(); assertEquals(4, procNs2.getProcId()); - assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(procNs2, nsName2)); - queue.releaseNamespaceExclusiveLock(procNs2, nsName2); + assertEquals(false, queue.waitNamespaceExclusiveLock(procNs2, nsName2)); + queue.wakeNamespaceExclusiveLock(procNs2, nsName2); + + // add procNs2 back in the queue queue.yield(procNs2); // table on ns1 is locked, so we get table on ns2 procNs2 = queue.poll(); assertEquals(3, procNs2.getProcId()); - assertEquals(true, queue.tryAcquireTableExclusiveLock(procNs2, tableName2)); + assertEquals(false, queue.waitTableExclusiveLock(procNs2, tableName2)); // ns2 is not available (TODO we may avoid this one) Procedure procNs2b = queue.poll(); assertEquals(4, procNs2b.getProcId()); - assertEquals(false, queue.tryAcquireNamespaceExclusiveLock(procNs2b, nsName2)); - queue.yield(procNs2b); + assertEquals(true, queue.waitNamespaceExclusiveLock(procNs2b, nsName2)); // release the ns1 lock - queue.releaseNamespaceExclusiveLock(procNs1, nsName1); + queue.wakeNamespaceExclusiveLock(procNs1, nsName1); // we are now able to execute table of ns1 long procId = queue.poll().getProcId(); assertEquals(2, procId); - queue.releaseTableExclusiveLock(procNs2, tableName2); + // release ns2 + queue.wakeTableExclusiveLock(procNs2, tableName2); // we are now able to execute ns2 procId = queue.poll().getProcId(); @@ -314,35 +315,18 @@ public class TestMasterProcedureScheduler { // Fetch the ns item and take the xlock Procedure proc = queue.poll(); assertEquals(1, proc.getProcId()); - assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(proc, nsName)); + assertEquals(false, queue.waitNamespaceExclusiveLock(proc, nsName)); // the table operation can't be executed because the ns is locked assertEquals(null, queue.poll(0)); // release the ns lock - queue.releaseNamespaceExclusiveLock(proc, nsName); + queue.wakeNamespaceExclusiveLock(proc, nsName); proc = queue.poll(); assertEquals(2, proc.getProcId()); - assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName)); - queue.releaseTableExclusiveLock(proc, tableName); - } - - @Test - public void testSharedLock() throws Exception { - final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - final TableName tableName = TableName.valueOf("testtb"); - TestTableProcedure procA = - new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.READ); - TestTableProcedure procB = - new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ); - - assertTrue(queue.tryAcquireTableSharedLock(procA, tableName)); - assertTrue(queue.tryAcquireTableSharedLock(procB, tableName)); - - queue.releaseTableSharedLock(procA, tableName); - queue.releaseTableSharedLock(procB, tableName); + assertEquals(false, queue.waitTableExclusiveLock(proc, tableName)); + queue.wakeTableExclusiveLock(proc, tableName); } @Test @@ -371,13 +355,13 @@ public class TestMasterProcedureScheduler { // Fetch the 2nd item and take the xlock proc = queue.poll(); assertEquals(2, proc.getProcId()); - assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName)); + assertEquals(false, queue.waitTableExclusiveLock(proc, tableName)); // everything is locked by the table operation assertEquals(null, queue.poll(0)); // release the table xlock - queue.releaseTableExclusiveLock(proc, tableName); + queue.wakeTableExclusiveLock(proc, tableName); // grab the last item in the queue proc = queue.poll(); @@ -410,13 +394,13 @@ public class TestMasterProcedureScheduler { // Fetch the 1st item and take the write lock Procedure proc = queue.poll(); assertEquals(1, proc.getProcId()); - assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName)); + assertEquals(false, queue.waitTableExclusiveLock(proc, tableName)); // everything is locked by the table operation assertEquals(null, queue.poll(0)); // release the table lock - queue.releaseTableExclusiveLock(proc, tableName); + queue.wakeTableExclusiveLock(proc, tableName); // Fetch the 2nd item and the the lock on regionA and regionB Procedure mergeProc = queue.poll(); @@ -475,7 +459,7 @@ public class TestMasterProcedureScheduler { // Fetch the 1st item from the queue, "the root procedure" and take the table lock Procedure rootProc = queue.poll(); assertEquals(1, rootProc.getProcId()); - assertEquals(true, queue.tryAcquireTableExclusiveLock(rootProc, tableName)); + assertEquals(false, queue.waitTableExclusiveLock(rootProc, tableName)); assertEquals(null, queue.poll(0)); // Execute the 1st step of the root-proc. @@ -519,7 +503,7 @@ public class TestMasterProcedureScheduler { assertEquals(null, queue.poll(0)); // release the table lock (for the root procedure) - queue.releaseTableExclusiveLock(rootProc, tableName); + queue.wakeTableExclusiveLock(rootProc, tableName); } @Test @@ -639,7 +623,7 @@ public class TestMasterProcedureScheduler { // fetch and acquire first xlock proc Procedure parentProc = queue.poll(); assertEquals(rootProc, parentProc); - assertTrue(queue.tryAcquireTableExclusiveLock(parentProc, tableName)); + assertEquals(false, queue.waitTableExclusiveLock(parentProc, tableName)); // add child procedure for (int i = 0; i < childProcs.length; ++i) { @@ -662,13 +646,13 @@ public class TestMasterProcedureScheduler { assertEquals(null, queue.poll(0)); // release xlock - queue.releaseTableExclusiveLock(parentProc, tableName); + queue.wakeTableExclusiveLock(parentProc, tableName); // fetch the other xlock proc Procedure proc = queue.poll(); assertEquals(100, proc.getProcId()); - assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); - queue.releaseTableExclusiveLock(proc, tableName); + assertEquals(false, queue.waitTableExclusiveLock(proc, tableName)); + queue.wakeTableExclusiveLock(proc, tableName); } @Test @@ -697,7 +681,7 @@ public class TestMasterProcedureScheduler { // fetch and acquire first xlock proc Procedure parentProc = queue.poll(); assertEquals(rootProc, parentProc); - assertTrue(queue.tryAcquireTableExclusiveLock(parentProc, tableName)); + assertEquals(false, queue.waitTableExclusiveLock(parentProc, tableName)); // add child procedure queue.addFront(childProc); @@ -705,11 +689,11 @@ public class TestMasterProcedureScheduler { // fetch the other xlock proc Procedure proc = queue.poll(); assertEquals(childProc, proc); - assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); - queue.releaseTableExclusiveLock(proc, tableName); + assertEquals(false, queue.waitTableExclusiveLock(proc, tableName)); + queue.wakeTableExclusiveLock(proc, tableName); // release xlock - queue.releaseTableExclusiveLock(parentProc, tableName); + queue.wakeTableExclusiveLock(parentProc, tableName); } @Test @@ -724,7 +708,7 @@ public class TestMasterProcedureScheduler { // fetch from the queue and acquire xlock for the first proc Procedure proc = queue.poll(); assertEquals(1, proc.getProcId()); - assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName)); + assertEquals(false, queue.waitTableExclusiveLock(proc, tableName)); // nothing available, until xlock release assertEquals(null, queue.poll(0)); @@ -737,7 +721,7 @@ public class TestMasterProcedureScheduler { assertEquals(1, proc.getProcId()); // release the xlock - queue.releaseTableExclusiveLock(proc, tableName); + queue.wakeTableExclusiveLock(proc, tableName); proc = queue.poll(); assertEquals(2, proc.getProcId()); @@ -757,12 +741,12 @@ public class TestMasterProcedureScheduler { // fetch and acquire the first shared-lock Procedure proc1 = queue.poll(); assertEquals(1, proc1.getProcId()); - assertEquals(true, queue.tryAcquireTableSharedLock(proc1, tableName)); + assertEquals(false, queue.waitTableSharedLock(proc1, tableName)); // fetch and acquire the second shared-lock Procedure proc2 = queue.poll(); assertEquals(2, proc2.getProcId()); - assertEquals(true, queue.tryAcquireTableSharedLock(proc2, tableName)); + assertEquals(false, queue.waitTableSharedLock(proc2, tableName)); // nothing available, until xlock release assertEquals(null, queue.poll(0)); @@ -778,8 +762,8 @@ public class TestMasterProcedureScheduler { assertEquals(2, proc2.getProcId()); // release the xlock - queue.releaseTableSharedLock(proc1, tableName); - queue.releaseTableSharedLock(proc2, tableName); + queue.wakeTableSharedLock(proc1, tableName); + queue.wakeTableSharedLock(proc2, tableName); Procedure proc3 = queue.poll(); assertEquals(3, proc3.getProcId());