Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 08B55200CBB for ; Sat, 27 May 2017 01:47:24 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 075CA160BD9; Fri, 26 May 2017 23:47:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EE557160BDF for ; Sat, 27 May 2017 01:47:21 +0200 (CEST) Received: (qmail 83211 invoked by uid 500); 26 May 2017 23:47:14 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 79221 invoked by uid 99); 26 May 2017 23:47:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 May 2017 23:47:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 05234E04AA; Fri, 26 May 2017 23:47:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Date: Fri, 26 May 2017 23:48:06 -0000 Message-Id: <5c36202148b34b66ab7fdb3789f2997c@git.apache.org> In-Reply-To: <7a3a6e2c875246c896b1e32821e9dba2@git.apache.org> References: <7a3a6e2c875246c896b1e32821e9dba2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [58/59] [abbrv] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager, one that describes Assignment using a State Machine built on top of ProcedureV2 facility. archived-at: Fri, 26 May 2017 23:47:24 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/1e82848a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 1bb6118..b648cf2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -32,6 +32,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.DelayQueue; @@ -113,9 +115,11 @@ public class ProcedureExecutor { * Internal cleaner that removes the completed procedure results after a TTL. * NOTE: This is a special case handled in timeoutLoop(). * - * Since the client code looks more or less like: + *

Since the client code looks more or less like: + *

    *   procId = master.doOperation()
    *   while (master.getProcResult(procId) == ProcInProgress);
+   * 
* The master should not throw away the proc result as soon as the procedure is done * but should wait a result request from the client (see executor.removeResult(procId)) * The client will call something like master.isProcDone() or master.getProcResult() @@ -480,10 +484,10 @@ public class ProcedureExecutor { // We have numThreads executor + one timer thread used for timing out // procedures and triggering periodic procedures. this.corePoolSize = numThreads; - LOG.info("Starting executor worker threads=" + corePoolSize); + LOG.info("Starting ProcedureExecutor Worker threads (ProcExecWrkr)=" + corePoolSize); // Create the Thread Group for the executors - threadGroup = new ThreadGroup("ProcedureExecutor"); + threadGroup = new ThreadGroup("ProcExecThrdGrp"); // Create the timeout executor timeoutExecutor = new TimeoutExecutorThread(threadGroup); @@ -1077,13 +1081,16 @@ public class ProcedureExecutor { final Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { // The 'proc' was ready to run but the root procedure was rolledback + LOG.warn("Rollback because parent is done/rolledback proc=" + proc); executeRollback(proc); return; } final RootProcedureState procStack = rollbackStack.get(rootProcId); - if (procStack == null) return; - + if (procStack == null) { + LOG.warn("RootProcedureState is null for " + proc.getProcId()); + return; + } do { // Try to acquire the execution if (!procStack.acquire(proc)) { @@ -1097,6 +1104,7 @@ public class ProcedureExecutor { scheduler.yield(proc); break; case LOCK_EVENT_WAIT: + LOG.info("LOCK_EVENT_WAIT rollback..." + proc); procStack.unsetRollback(); break; default: @@ -1114,6 +1122,7 @@ public class ProcedureExecutor { scheduler.yield(proc); break; case LOCK_EVENT_WAIT: + LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc); break; default: throw new UnsupportedOperationException(); @@ -1125,16 +1134,21 @@ public class ProcedureExecutor { // Execute the procedure assert proc.getState() == ProcedureState.RUNNABLE : proc; - switch (acquireLock(proc)) { + // Note that lock is NOT about concurrency but rather about ensuring + // ownership of a procedure of an entity such as a region or table + LockState lockState = acquireLock(proc); + switch (lockState) { case LOCK_ACQUIRED: execProcedure(procStack, proc); releaseLock(proc, false); break; case LOCK_YIELD_WAIT: + LOG.info(lockState + " " + proc); scheduler.yield(proc); break; case LOCK_EVENT_WAIT: - // someone will wake us up when the lock is available + // Someone will wake us up when the lock is available + LOG.debug(lockState + " " + proc); break; default: throw new UnsupportedOperationException(); @@ -1150,10 +1164,7 @@ public class ProcedureExecutor { if (proc.isSuccess()) { // update metrics on finishing the procedure proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); - - if (LOG.isDebugEnabled()) { - LOG.debug("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); - } + LOG.info("Finish " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); // Finalize the procedure state if (proc.getProcId() == rootProcId) { procedureFinished(proc); @@ -1178,7 +1189,7 @@ public class ProcedureExecutor { private void releaseLock(final Procedure proc, final boolean force) { final TEnvironment env = getEnvironment(); - // for how the framework works, we know that we will always have the lock + // For how the framework works, we know that we will always have the lock // when we call releaseLock(), so we can avoid calling proc.hasLock() if (force || !proc.holdLock(env)) { proc.doReleaseLock(env); @@ -1193,6 +1204,8 @@ public class ProcedureExecutor { private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) { final Procedure rootProc = procedures.get(rootProcId); RemoteProcedureException exception = rootProc.getException(); + // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are + // rolling back because the subprocedure does. Clarify. if (exception == null) { exception = procStack.getException(); rootProc.setFailure(exception); @@ -1269,7 +1282,7 @@ public class ProcedureExecutor { return LockState.LOCK_YIELD_WAIT; } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... - LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e); + LOG.fatal("CODE-BUG: Uncaught runtime exception fo " + proc, e); } // allows to kill the executor before something is stored to the wal. @@ -1305,29 +1318,55 @@ public class ProcedureExecutor { } /** - * Executes the specified procedure - * - calls the doExecute() of the procedure - * - if the procedure execution didn't fail (e.g. invalid user input) - * - ...and returned subprocedures - * - the subprocedures are initialized. - * - the subprocedures are added to the store - * - the subprocedures are added to the runnable queue - * - the procedure is now in a WAITING state, waiting for the subprocedures to complete - * - ...if there are no subprocedure - * - the procedure completed successfully - * - if there is a parent (WAITING) - * - the parent state will be set to RUNNABLE - * - in case of failure - * - the store is updated with the new state - * - the executor (caller of this method) will start the rollback of the procedure + * Executes procedure + *
    + *
  • Calls the doExecute() of the procedure + *
  • If the procedure execution didn't fail (i.e. valid user input) + *
      + *
    • ...and returned subprocedures + *
      • The subprocedures are initialized. + *
      • The subprocedures are added to the store + *
      • The subprocedures are added to the runnable queue + *
      • The procedure is now in a WAITING state, waiting for the subprocedures to complete + *
      + *
    • + *
    • ...if there are no subprocedure + *
      • the procedure completed successfully + *
      • if there is a parent (WAITING) + *
      • the parent state will be set to RUNNABLE + *
      + *
    • + *
    + *
  • + *
  • In case of failure + *
      + *
    • The store is updated with the new state
    • + *
    • The executor (caller of this method) will start the rollback of the procedure
    • + *
    + *
  • + *
*/ - private void execProcedure(final RootProcedureState procStack, final Procedure procedure) { + private void execProcedure(final RootProcedureState procStack, + final Procedure procedure) { Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE); - // Execute the procedure + // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. + // The exception is caught below and then we hurry to the exit without disturbing state. The + // idea is that the processing of this procedure will be unsuspended later by an external event + // such the report of a region open. TODO: Currently, its possible for two worker threads + // to be working on the same procedure concurrently (locking in procedures is NOT about + // concurrency but about tying an entity to a procedure; i.e. a region to a particular + // procedure instance). This can make for issues if both threads are changing state. + // See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent()); + // in RegionTransitionProcedure#reportTransition for example of Procedure putting + // itself back on the scheduler making it possible for two threads running against + // the one Procedure. Might be ok if they are both doing different, idempotent sections. boolean suspended = false; + + // Whether to 're-' -execute; run through the loop again. boolean reExecute = false; - Procedure[] subprocs = null; + + Procedure[] subprocs = null; do { reExecute = false; try { @@ -1336,14 +1375,20 @@ public class ProcedureExecutor { subprocs = null; } } catch (ProcedureSuspendedException e) { + if (LOG.isTraceEnabled()) { + LOG.trace("Suspend " + procedure); + } suspended = true; } catch (ProcedureYieldException e) { if (LOG.isTraceEnabled()) { - LOG.trace("Yield " + procedure + ": " + e.getMessage()); + LOG.trace("Yield " + procedure + ": " + e.getMessage(), e); } scheduler.yield(procedure); return; } catch (InterruptedException e) { + if (LOG.isTraceEnabled()) { + LOG.trace("Yield interrupt " + procedure + ": " + e.getMessage(), e); + } handleInterruptedException(procedure, e); scheduler.yield(procedure); return; @@ -1357,14 +1402,26 @@ public class ProcedureExecutor { if (!procedure.isFailed()) { if (subprocs != null) { if (subprocs.length == 1 && subprocs[0] == procedure) { - // quick-shortcut for a state machine like procedure + // Procedure returned itself. Quick-shortcut for a state machine-like procedure; + // i.e. we go around this loop again rather than go back out on the scheduler queue. subprocs = null; reExecute = true; + if (LOG.isTraceEnabled()) { + LOG.trace("Short-circuit to next step on pid=" + procedure.getProcId()); + } } else { - // yield the current procedure, and make the subprocedure runnable + // Yield the current procedure, and make the subprocedure runnable + // subprocs may come back 'null'. subprocs = initializeChildren(procStack, procedure, subprocs); + LOG.info("Initialized subprocedures=" + + (subprocs == null? null: + Stream.of(subprocs).map(e -> "{" + e.toString() + "}"). + collect(Collectors.toList()).toString())); } } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { + if (LOG.isTraceEnabled()) { + LOG.trace("Added to timeoutExecutor " + procedure); + } timeoutExecutor.add(procedure); } else if (!suspended) { // No subtask, so we are done @@ -1388,12 +1445,13 @@ public class ProcedureExecutor { // executor thread to stop. The statement following the method call below seems to check if // store is not running, to prevent scheduling children procedures, re-execution or yield // of this procedure. This may need more scrutiny and subsequent cleanup in future - // Commit the transaction + // + // Commit the transaction even if a suspend (state may have changed). Note this append + // can take a bunch of time to complete. updateStoreOnExec(procStack, procedure, subprocs); // if the store is not running we are aborting if (!store.isRunning()) return; - // if the procedure is kind enough to pass the slot to someone else, yield if (procedure.isRunnable() && !suspended && procedure.isYieldAfterExecutionStep(getEnvironment())) { @@ -1403,14 +1461,14 @@ public class ProcedureExecutor { assert (reExecute && subprocs == null) || !reExecute; } while (reExecute); - // Submit the new subprocedures if (subprocs != null && !procedure.isFailed()) { submitChildrenProcedures(subprocs); } - // if the procedure is complete and has a parent, count down the children latch - if (procedure.isFinished() && procedure.hasParent()) { + // if the procedure is complete and has a parent, count down the children latch. + // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. + if (!suspended && procedure.isFinished() && procedure.hasParent()) { countDownChildren(procStack, procedure); } } @@ -1469,18 +1527,13 @@ public class ProcedureExecutor { } // If this procedure is the last child awake the parent procedure - final boolean traceEnabled = LOG.isTraceEnabled(); - if (traceEnabled) { - LOG.trace(parent + " child is done: " + procedure); - } - - if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) { - parent.setState(ProcedureState.RUNNABLE); + LOG.info("Finish suprocedure " + procedure); + if (parent.tryRunnable()) { + // If we succeeded in making the parent runnable -- i.e. all of its + // children have completed, move parent to front of the queue. store.update(parent); scheduler.addFront(parent); - if (traceEnabled) { - LOG.trace(parent + " all the children finished their work, resume."); - } + LOG.info("Finished subprocedure(s) of " + parent + "; resume parent processing."); return; } } @@ -1569,9 +1622,10 @@ public class ProcedureExecutor { // ========================================================================== private final class WorkerThread extends StoppableThread { private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); + private Procedure activeProcedure; public WorkerThread(final ThreadGroup group) { - super(group, "ProcExecWorker-" + workerId.incrementAndGet()); + super(group, "ProcExecWrkr-" + workerId.incrementAndGet()); } @Override @@ -1581,29 +1635,49 @@ public class ProcedureExecutor { @Override public void run() { - final boolean traceEnabled = LOG.isTraceEnabled(); long lastUpdate = EnvironmentEdgeManager.currentTime(); - while (isRunning() && keepAlive(lastUpdate)) { - final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); - if (procedure == null) continue; - - store.setRunningProcedureCount(activeExecutorCount.incrementAndGet()); - executionStartTime.set(EnvironmentEdgeManager.currentTime()); - try { - if (traceEnabled) { - LOG.trace("Trying to start the execution of " + procedure); + try { + while (isRunning() && keepAlive(lastUpdate)) { + this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); + if (this.activeProcedure == null) continue; + int activeCount = activeExecutorCount.incrementAndGet(); + int runningCount = store.setRunningProcedureCount(activeCount); + if (LOG.isTraceEnabled()) { + LOG.trace("Execute pid=" + this.activeProcedure.getProcId() + + " runningCount=" + runningCount + ", activeCount=" + activeCount); + } + executionStartTime.set(EnvironmentEdgeManager.currentTime()); + try { + executeProcedure(this.activeProcedure); + } catch (AssertionError e) { + LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e); + throw e; + } finally { + activeCount = activeExecutorCount.decrementAndGet(); + runningCount = store.setRunningProcedureCount(activeCount); + if (LOG.isTraceEnabled()) { + LOG.trace("Halt pid=" + this.activeProcedure.getProcId() + + " runningCount=" + runningCount + ", activeCount=" + activeCount); + } + this.activeProcedure = null; + lastUpdate = EnvironmentEdgeManager.currentTime(); + executionStartTime.set(Long.MAX_VALUE); } - executeProcedure(procedure); - } finally { - store.setRunningProcedureCount(activeExecutorCount.decrementAndGet()); - lastUpdate = EnvironmentEdgeManager.currentTime(); - executionStartTime.set(Long.MAX_VALUE); } + } catch (Throwable t) { + LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t); + } finally { + LOG.debug("Worker terminated."); } - LOG.debug("Worker thread terminated " + this); workerThreads.remove(this); } + @Override + public String toString() { + Procedure p = this.activeProcedure; + return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")"); + } + /** * @return the time since the current procedure is running */ @@ -1617,14 +1691,15 @@ public class ProcedureExecutor { } } - // ========================================================================== - // Timeout Thread - // ========================================================================== + /** + * Runs task on a period such as check for stuck workers. + * @see InlineChore + */ private final class TimeoutExecutorThread extends StoppableThread { private final DelayQueue queue = new DelayQueue<>(); public TimeoutExecutorThread(final ThreadGroup group) { - super(group, "ProcedureTimeoutExecutor"); + super(group, "ProcExecTimeout"); } @Override @@ -1634,7 +1709,7 @@ public class ProcedureExecutor { @Override public void run() { - final boolean isTraceEnabled = LOG.isTraceEnabled(); + final boolean traceEnabled = LOG.isTraceEnabled(); while (isRunning()) { final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); if (task == null || task == DelayedUtil.DELAYED_POISON) { @@ -1643,8 +1718,8 @@ public class ProcedureExecutor { continue; } - if (isTraceEnabled) { - LOG.trace("Trying to start the execution of " + task); + if (traceEnabled) { + LOG.trace("Executing " + task); } // execute the task @@ -1665,6 +1740,8 @@ public class ProcedureExecutor { public void add(final Procedure procedure) { assert procedure.getState() == ProcedureState.WAITING_TIMEOUT; + LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() + + ", timestamp=" + procedure.getTimeoutTimestamp()); queue.add(new DelayedProcedure(procedure)); } http://git-wip-us.apache.org/repos/asf/hbase/blob/1e82848a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java index bdced10..b148dae 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; /** * Special procedure used as a chore. - * instead of bringing the Chore class in (dependencies reason), + * Instead of bringing the Chore class in (dependencies reason), * we reuse the executor timeout thread for this special case. * * The assumption is that procedure is used as hook to dispatch other procedures @@ -43,7 +43,7 @@ public abstract class ProcedureInMemoryChore extends Procedure[] execute(final TEnvironment env) { throw new UnsupportedOperationException(); } @@ -66,4 +66,4 @@ public abstract class ProcedureInMemoryChore extends Procedure listLocks(); /** http://git-wip-us.apache.org/repos/asf/hbase/blob/1e82848a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java new file mode 100644 index 0000000..8d5ff3c --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; +import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp; +import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; + +import com.google.common.collect.ArrayListMultimap; + +/** + * A procedure dispatcher that aggregates and sends after elapsed time or after we hit + * count threshold. Creates its own threadpool to run RPCs with timeout. + *
    + *
  • Each server queue has a dispatch buffer
  • + *
  • Once the dispatch buffer reaches a threshold-size/time we send
  • + *
+ *

Call {@link #start()} and then {@link #submitTask(Callable)}. When done, + * call {@link #stop()}. + */ +@InterfaceAudience.Private +public abstract class RemoteProcedureDispatcher> { + private static final Log LOG = LogFactory.getLog(RemoteProcedureDispatcher.class); + + public static final String THREAD_POOL_SIZE_CONF_KEY = + "hbase.procedure.remote.dispatcher.threadpool.size"; + private static final int DEFAULT_THREAD_POOL_SIZE = 128; + + public static final String DISPATCH_DELAY_CONF_KEY = + "hbase.procedure.remote.dispatcher.delay.msec"; + private static final int DEFAULT_DISPATCH_DELAY = 150; + + public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY = + "hbase.procedure.remote.dispatcher.max.queue.size"; + private static final int DEFAULT_MAX_QUEUE_SIZE = 32; + + private final AtomicBoolean running = new AtomicBoolean(false); + private final ConcurrentHashMap nodeMap = + new ConcurrentHashMap(); + + private final int operationDelay; + private final int queueMaxSize; + private final int corePoolSize; + + private TimeoutExecutorThread timeoutExecutor; + private ThreadPoolExecutor threadPool; + + protected RemoteProcedureDispatcher(Configuration conf) { + this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE); + this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY); + this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE); + } + + public boolean start() { + if (running.getAndSet(true)) { + LOG.warn("Already running"); + return false; + } + + LOG.info("Starting procedure remote dispatcher; threads=" + this.corePoolSize + + ", queueMaxSize=" + this.queueMaxSize + ", operationDelay=" + this.operationDelay); + + // Create the timeout executor + timeoutExecutor = new TimeoutExecutorThread(); + timeoutExecutor.start(); + + // Create the thread pool that will execute RPCs + threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS, + Threads.newDaemonThreadFactory(this.getClass().getSimpleName(), + getUncaughtExceptionHandler())); + return true; + } + + public boolean stop() { + if (!running.getAndSet(false)) { + return false; + } + + LOG.info("Stopping procedure remote dispatcher"); + + // send stop signals + timeoutExecutor.sendStopSignal(); + threadPool.shutdownNow(); + return true; + } + + public void join() { + assert !running.get() : "expected not running"; + + // wait the timeout executor + timeoutExecutor.awaitTermination(); + timeoutExecutor = null; + + // wait for the thread pool to terminate + threadPool.shutdownNow(); + try { + while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.warn("Waiting for thread-pool to terminate"); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for thread-pool termination", e); + } + } + + protected UncaughtExceptionHandler getUncaughtExceptionHandler() { + return new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Failed to execute remote procedures " + t.getName(), e); + } + }; + } + + // ============================================================================================ + // Node Helpers + // ============================================================================================ + /** + * Add a node that will be able to execute remote procedures + * @param key the node identifier + */ + public void addNode(final TRemote key) { + assert key != null: "Tried to add a node with a null key"; + final BufferNode newNode = new BufferNode(key); + nodeMap.putIfAbsent(key, newNode); + } + + /** + * Add a remote rpc. Be sure to check result for successful add. + * @param key the node identifier + * @return True if we successfully added the operation. + */ + public boolean addOperationToNode(final TRemote key, RemoteProcedure rp) { + assert key != null : "found null key for node"; + BufferNode node = nodeMap.get(key); + if (node == null) { + return false; + } + node.add(rp); + // Check our node still in the map; could have been removed by #removeNode. + return nodeMap.contains(node); + } + + /** + * Remove a remote node + * @param key the node identifier + */ + public boolean removeNode(final TRemote key) { + final BufferNode node = nodeMap.remove(key); + if (node == null) return false; + node.abortOperationsInQueue(); + return true; + } + + // ============================================================================================ + // Task Helpers + // ============================================================================================ + protected Future submitTask(Callable task) { + return threadPool.submit(task); + } + + protected Future submitTask(Callable task, long delay, TimeUnit unit) { + final FutureTask futureTask = new FutureTask(task); + timeoutExecutor.add(new DelayedTask(futureTask, delay, unit)); + return futureTask; + } + + protected abstract void remoteDispatch(TRemote key, Set operations); + protected abstract void abortPendingOperations(TRemote key, Set operations); + + /** + * Data structure with reference to remote operation. + */ + public static abstract class RemoteOperation { + private final RemoteProcedure remoteProcedure; + + protected RemoteOperation(final RemoteProcedure remoteProcedure) { + this.remoteProcedure = remoteProcedure; + } + + public RemoteProcedure getRemoteProcedure() { + return remoteProcedure; + } + } + + /** + * Remote procedure reference. + * @param + * @param + */ + public interface RemoteProcedure { + RemoteOperation remoteCallBuild(TEnv env, TRemote remote); + void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response); + void remoteCallFailed(TEnv env, TRemote remote, IOException exception); + } + + /** + * Account of what procedures are running on remote node. + * @param + * @param + */ + public interface RemoteNode { + TRemote getKey(); + void add(RemoteProcedure operation); + void dispatch(); + } + + protected ArrayListMultimap, RemoteOperation> buildAndGroupRequestByType(final TEnv env, + final TRemote remote, final Set operations) { + final ArrayListMultimap, RemoteOperation> requestByType = ArrayListMultimap.create(); + for (RemoteProcedure proc: operations) { + RemoteOperation operation = proc.remoteCallBuild(env, remote); + requestByType.put(operation.getClass(), operation); + } + return requestByType; + } + + protected List fetchType( + final ArrayListMultimap, RemoteOperation> requestByType, final Class type) { + return (List)requestByType.removeAll(type); + } + + // ============================================================================================ + // Timeout Helpers + // ============================================================================================ + private final class TimeoutExecutorThread extends Thread { + private final DelayQueue queue = new DelayQueue(); + + public TimeoutExecutorThread() { + super("ProcedureDispatcherTimeoutThread"); + } + + @Override + public void run() { + while (running.get()) { + final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); + if (task == null || task == DelayedUtil.DELAYED_POISON) { + // the executor may be shutting down, and the task is just the shutdown request + continue; + } + if (task instanceof DelayedTask) { + threadPool.execute(((DelayedTask)task).getObject()); + } else { + ((BufferNode)task).dispatch(); + } + } + } + + public void add(final DelayedWithTimeout delayed) { + queue.add(delayed); + } + + public void remove(final DelayedWithTimeout delayed) { + queue.remove(delayed); + } + + public void sendStopSignal() { + queue.add(DelayedUtil.DELAYED_POISON); + } + + public void awaitTermination() { + try { + final long startTime = EnvironmentEdgeManager.currentTime(); + for (int i = 0; isAlive(); ++i) { + sendStopSignal(); + join(250); + if (i > 0 && (i % 8) == 0) { + LOG.warn("Waiting termination of thread " + getName() + ", " + + StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); + } + } + } catch (InterruptedException e) { + LOG.warn(getName() + " join wait got interrupted", e); + } + } + } + + // ============================================================================================ + // Internals Helpers + // ============================================================================================ + + /** + * Node that contains a set of RemoteProcedures + */ + protected final class BufferNode extends DelayedContainerWithTimestamp + implements RemoteNode { + private Set operations; + + protected BufferNode(final TRemote key) { + super(key, 0); + } + + public TRemote getKey() { + return getObject(); + } + + public synchronized void add(final RemoteProcedure operation) { + if (this.operations == null) { + this.operations = new HashSet<>(); + setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay); + timeoutExecutor.add(this); + } + this.operations.add(operation); + if (this.operations.size() > queueMaxSize) { + timeoutExecutor.remove(this); + dispatch(); + } + } + + public synchronized void dispatch() { + if (operations != null) { + remoteDispatch(getKey(), operations); + this.operations = null; + } + } + + public synchronized void abortOperationsInQueue() { + if (operations != null) { + abortPendingOperations(getKey(), operations); + this.operations = null; + } + } + + @Override + public String toString() { + return super.toString() + ", operations=" + this.operations; + } + } + + /** + * Delayed object that holds a FutureTask. + * used to submit something later to the thread-pool. + */ + private static final class DelayedTask extends DelayedContainerWithTimestamp> { + public DelayedTask(final FutureTask task, final long delay, final TimeUnit unit) { + super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay)); + } + }; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1e82848a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java index 1a84070..64bb278 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java @@ -27,12 +27,13 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.SequentialProcedureData; /** - * A SequentialProcedure describes one step in a procedure chain. + * A SequentialProcedure describes one step in a procedure chain: + *

  *   -> Step 1 -> Step 2 -> Step 3
- *
+ * 
* The main difference from a base Procedure is that the execute() of a - * SequentialProcedure will be called only once, there will be no second - * execute() call once the child are finished. which means once the child + * SequentialProcedure will be called only once; there will be no second + * execute() call once the children are finished. which means once the child * of a SequentialProcedure are completed the SequentialProcedure is completed too. */ @InterfaceAudience.Private http://git-wip-us.apache.org/repos/asf/hbase/blob/1e82848a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java index 0590a93..becd9b7 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java @@ -21,9 +21,10 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,7 +35,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMa /** * Procedure described by a series of steps. * - * The procedure implementor must have an enum of 'states', describing + *

The procedure implementor must have an enum of 'states', describing * the various step of the procedure. * Once the procedure is running, the procedure-framework will call executeFromState() * using the 'state' provided by the user. The first call to executeFromState() @@ -56,7 +57,7 @@ public abstract class StateMachineProcedure private int stateCount = 0; private int[] states = null; - private ArrayList subProcList = null; + private List> subProcList = null; protected enum Flow { HAS_MORE_STATE, @@ -70,7 +71,7 @@ public abstract class StateMachineProcedure * Flow.HAS_MORE_STATE if there is another step. */ protected abstract Flow executeFromState(TEnvironment env, TState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException; + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException; /** * called to perform the rollback of the specified state @@ -125,12 +126,15 @@ public abstract class StateMachineProcedure * Add a child procedure to execute * @param subProcedure the child procedure */ - protected void addChildProcedure(Procedure... subProcedure) { + protected void addChildProcedure(Procedure... subProcedure) { + if (subProcedure == null) return; + final int len = subProcedure.length; + if (len == 0) return; if (subProcList == null) { - subProcList = new ArrayList<>(subProcedure.length); + subProcList = new ArrayList<>(len); } - for (int i = 0; i < subProcedure.length; ++i) { - Procedure proc = subProcedure[i]; + for (int i = 0; i < len; ++i) { + Procedure proc = subProcedure[i]; if (!proc.hasOwner()) proc.setOwner(getOwner()); subProcList.add(proc); } @@ -138,27 +142,23 @@ public abstract class StateMachineProcedure @Override protected Procedure[] execute(final TEnvironment env) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { updateTimestamp(); try { failIfAborted(); if (!hasMoreState() || isFailed()) return null; - TState state = getCurrentState(); if (stateCount == 0) { setNextState(getStateId(state)); } - stateFlow = executeFromState(env, state); if (!hasMoreState()) setNextState(EOF_STATE); - - if (subProcList != null && subProcList.size() != 0) { + if (subProcList != null && !subProcList.isEmpty()) { Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]); subProcList = null; return subProcedures; } - return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this}; } finally { updateTimestamp(); http://git-wip-us.apache.org/repos/asf/hbase/blob/1e82848a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index c03e326..9e53f42 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -52,8 +52,8 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override - public void setRunningProcedureCount(final int count) { - // no-op + public int setRunningProcedureCount(final int count) { + return count; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/1e82848a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 385cedb..a690c81 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -153,8 +153,9 @@ public interface ProcedureStore { /** * Set the number of procedure running. * This can be used, for example, by the store to know how long to wait before a sync. + * @return how many procedures are running (may not be same as count). */ - void setRunningProcedureCount(int count); + int setRunningProcedureCount(int count); /** * Acquire the lease for the procedure store. http://git-wip-us.apache.org/repos/asf/hbase/blob/1e82848a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java index 012ddeb..95a1ef6 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java @@ -155,9 +155,23 @@ public class ProcedureWALFile implements Comparable { this.logSize += size; } - public void removeFile() throws IOException { + public void removeFile(final Path walArchiveDir) throws IOException { close(); - fs.delete(logFile, false); + boolean archived = false; + if (walArchiveDir != null) { + Path archivedFile = new Path(walArchiveDir, logFile.getName()); + LOG.info("ARCHIVED (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + logFile + " to " + archivedFile); + if (!fs.rename(logFile, archivedFile)) { + LOG.warn("Failed archive of " + logFile + ", deleting"); + } else { + archived = true; + } + } + if (!archived) { + if (!fs.delete(logFile, false)) { + LOG.warn("Failed delete of " + logFile); + } + } } public void setProcIds(long minId, long maxId) { http://git-wip-us.apache.org/repos/asf/hbase/blob/1e82848a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index c672045..0a05e6e 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -83,11 +83,11 @@ public class ProcedureWALFormatReader { // // Fast Start: INIT/INSERT record and StackIDs // --------------------------------------------- - // We have two special record, INIT and INSERT that tracks the first time - // the procedure was added to the WAL. We can use that information to be able - // to start procedures before reaching the end of the WAL, or before reading all the WALs. - // but in some cases the WAL with that record can be already gone. - // In alternative we can use the stackIds on each procedure, + // We have two special records, INIT and INSERT, that track the first time + // the procedure was added to the WAL. We can use this information to be able + // to start procedures before reaching the end of the WAL, or before reading all WALs. + // But in some cases, the WAL with that record can be already gone. + // As an alternative, we can use the stackIds on each procedure, // to identify when a procedure is ready to start. // If there are gaps in the sum of the stackIds we need to read more WALs. // @@ -107,16 +107,16 @@ public class ProcedureWALFormatReader { * Global tracker that will be used by the WALProcedureStore after load. * If the last WAL was closed cleanly we already have a full tracker ready to be used. * If the last WAL was truncated (e.g. master killed) the tracker will be empty - * and the 'partial' flag will be set. In this case on WAL replay we are going + * and the 'partial' flag will be set. In this case, on WAL replay we are going * to rebuild the tracker. */ private final ProcedureStoreTracker tracker; - // private final boolean hasFastStartSupport; + // TODO: private final boolean hasFastStartSupport; /** * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we * re-build the list of procedures updated in that WAL because we need it for log cleaning - * purpose. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted. + * purposes. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted. * (see {@link WALProcedureStore#removeInactiveLogs()}). * However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother * re-building it. @@ -137,7 +137,7 @@ public class ProcedureWALFormatReader { public void read(final ProcedureWALFile log) throws IOException { localTracker = log.getTracker().isPartial() ? log.getTracker() : null; if (localTracker != null) { - LOG.info("Rebuilding tracker for log - " + log); + LOG.info("Rebuilding tracker for " + log); } FSDataInputStream stream = log.getStream(); @@ -146,7 +146,7 @@ public class ProcedureWALFormatReader { while (hasMore) { ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); if (entry == null) { - LOG.warn("nothing left to decode. exiting with missing EOF"); + LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log); break; } switch (entry.getType()) { @@ -171,7 +171,7 @@ public class ProcedureWALFormatReader { } } } catch (InvalidProtocolBufferException e) { - LOG.error("got an exception while reading the procedure WAL: " + log, e); + LOG.error("While reading procedure from " + log, e); loader.markCorruptedWAL(log, e); } @@ -211,7 +211,7 @@ public class ProcedureWALFormatReader { maxProcId = Math.max(maxProcId, proc.getProcId()); if (isRequired(proc.getProcId())) { if (LOG.isTraceEnabled()) { - LOG.trace("read " + entry.getType() + " entry " + proc.getProcId()); + LOG.trace("Read " + entry.getType() + " entry " + proc.getProcId()); } localProcedureMap.add(proc); if (tracker.isPartial()) { @@ -296,7 +296,7 @@ public class ProcedureWALFormatReader { // replayOrderHead = C <-> B <-> E <-> D <-> A <-> G // // We also have a lazy grouping by "root procedure", and a list of - // unlinked procedure. If after reading all the WALs we have unlinked + // unlinked procedures. If after reading all the WALs we have unlinked // procedures it means that we had a missing WAL or a corruption. // rootHead = A <-> D <-> G // B E @@ -639,17 +639,17 @@ public class ProcedureWALFormatReader { * "ready" means that we all the information that we need in-memory. * * Example-1: - * We have two WALs, we start reading fronm the newest (wal-2) + * We have two WALs, we start reading from the newest (wal-2) * wal-2 | C B | * wal-1 | A B C | * * If C and B don't depend on A (A is not the parent), we can start them - * before reading wal-1. If B is the only one with parent A we can start C - * and read one more WAL before being able to start B. + * before reading wal-1. If B is the only one with parent A we can start C. + * We have to read one more WAL before being able to start B. * * How do we know with the only information in B that we are not ready. * - easy case, the parent is missing from the global map - * - more complex case we look at the Stack IDs + * - more complex case we look at the Stack IDs. * * The Stack-IDs are added to the procedure order as incremental index * tracking how many times that procedure was executed, which is equivalent @@ -664,7 +664,7 @@ public class ProcedureWALFormatReader { * executed before. * To identify when a Procedure is ready we do the sum of the stackIds of * the procedure and the parent. if the stackIdSum is equals to the - * sum of {1..maxStackId} then everything we need is avaiable. + * sum of {1..maxStackId} then everything we need is available. * * Example-2 * wal-2 | A | A stackIds = [0, 2] @@ -676,7 +676,7 @@ public class ProcedureWALFormatReader { assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry; if (rootEntry.isFinished()) { - // if the root procedure is finished, sub-procedures should be gone + // If the root procedure is finished, sub-procedures should be gone if (rootEntry.childHead != null) { LOG.error("unexpected active children for root-procedure: " + rootEntry); for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { http://git-wip-us.apache.org/repos/asf/hbase/blob/1e82848a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 4712c30..1791cae 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -66,6 +66,7 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceStability.Evolving public class WALProcedureStore extends ProcedureStoreBase { private static final Log LOG = LogFactory.getLog(WALProcedureStore.class); + public static final String LOG_PREFIX = "pv2-"; public interface LeaseRecovery { void recoverFileLease(FileSystem fs, Path path) throws IOException; @@ -124,6 +125,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private final Configuration conf; private final FileSystem fs; private final Path walDir; + private final Path walArchiveDir; private final AtomicReference syncException = new AtomicReference<>(); private final AtomicBoolean loading = new AtomicBoolean(true); @@ -185,9 +187,15 @@ public class WALProcedureStore extends ProcedureStoreBase { public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, final LeaseRecovery leaseRecovery) { + this(conf, fs, walDir, null, leaseRecovery); + } + + public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, + final Path walArchiveDir, final LeaseRecovery leaseRecovery) { this.fs = fs; this.conf = conf; this.walDir = walDir; + this.walArchiveDir = walArchiveDir; this.leaseRecovery = leaseRecovery; } @@ -239,6 +247,16 @@ public class WALProcedureStore extends ProcedureStoreBase { } }; syncThread.start(); + + // Create archive dir up front. Rename won't work w/o it up on HDFS. + if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { + if (this.fs.mkdirs(this.walArchiveDir)) { + if (LOG.isDebugEnabled()) LOG.debug("Created Procedure Store WAL archive dir " + + this.walArchiveDir); + } else { + LOG.warn("Failed create of " + this.walArchiveDir); + } + } } @Override @@ -292,9 +310,9 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void setRunningProcedureCount(final int count) { - LOG.debug("Set running procedure count=" + count + ", slots=" + slots.length); + public int setRunningProcedureCount(final int count) { this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length; + return this.runningProcCount; } public ProcedureStoreTracker getStoreTracker() { @@ -343,7 +361,7 @@ public class WALProcedureStore extends ProcedureStoreBase { if (LOG.isDebugEnabled()) { LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); } - logs.getLast().removeFile(); + logs.getLast().removeFile(this.walArchiveDir); continue; } @@ -955,7 +973,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // but we should check if someone else has created new files if (getMaxLogId(getLogFiles()) > flushLogId) { LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId); - logs.getLast().removeFile(); + logs.getLast().removeFile(this.walArchiveDir); return false; } @@ -1047,7 +1065,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'. // once there is nothing olding the oldest WAL we can remove it. while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) { - removeLogFile(logs.getFirst()); + removeLogFile(logs.getFirst(), walArchiveDir); buildHoldingCleanupTracker(); } @@ -1079,8 +1097,8 @@ public class WALProcedureStore extends ProcedureStoreBase { private void removeAllLogs(long lastLogId) { if (logs.size() <= 1) return; - if (LOG.isDebugEnabled()) { - LOG.debug("Remove all state logs with ID less than " + lastLogId); + if (LOG.isTraceEnabled()) { + LOG.trace("Remove all state logs with ID less than " + lastLogId); } boolean removed = false; @@ -1089,7 +1107,7 @@ public class WALProcedureStore extends ProcedureStoreBase { if (lastLogId < log.getLogId()) { break; } - removeLogFile(log); + removeLogFile(log, walArchiveDir); removed = true; } @@ -1098,15 +1116,15 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - private boolean removeLogFile(final ProcedureWALFile log) { + private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) { try { if (LOG.isTraceEnabled()) { LOG.trace("Removing log=" + log); } - log.removeFile(); + log.removeFile(walArchiveDir); logs.remove(log); if (LOG.isDebugEnabled()) { - LOG.info("Removed log=" + log + " activeLogs=" + logs); + LOG.info("Removed log=" + log + ", activeLogs=" + logs); } assert logs.size() > 0 : "expected at least one log"; } catch (IOException e) { @@ -1128,7 +1146,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } protected Path getLogFilePath(final long logId) throws IOException { - return new Path(walDir, String.format("state-%020d.log", logId)); + return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId)); } private static long getLogIdFromName(final String name) { @@ -1141,7 +1159,7 @@ public class WALProcedureStore extends ProcedureStoreBase { @Override public boolean accept(Path path) { String name = path.getName(); - return name.startsWith("state-") && name.endsWith(".log"); + return name.startsWith(LOG_PREFIX) && name.endsWith(".log"); } }; @@ -1192,7 +1210,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); - ProcedureWALFile log = initOldLog(logFiles[i]); + ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); if (log != null) { this.logs.add(log); } @@ -1222,21 +1240,22 @@ public class WALProcedureStore extends ProcedureStoreBase { /** * Loads given log file and it's tracker. */ - private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException { + private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) + throws IOException { final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); if (logFile.getLen() == 0) { LOG.warn("Remove uninitialized log: " + logFile); - log.removeFile(); + log.removeFile(walArchiveDir); return null; } if (LOG.isDebugEnabled()) { - LOG.debug("Opening state-log: " + logFile); + LOG.debug("Opening Pv2 " + logFile); } try { log.open(); } catch (ProcedureWALFormat.InvalidWALDataException e) { LOG.warn("Remove uninitialized log: " + logFile, e); - log.removeFile(); + log.removeFile(walArchiveDir); return null; } catch (IOException e) { String msg = "Unable to read state log: " + logFile; http://git-wip-us.apache.org/repos/asf/hbase/blob/1e82848a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java index cde37bd..faf8e7e 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +// FIX namings. TODO. @InterfaceAudience.Private @InterfaceStability.Evolving public final class DelayedUtil { @@ -148,6 +149,9 @@ public final class DelayedUtil { } } + /** + * Has a timeout. + */ public static class DelayedContainerWithTimestamp extends DelayedContainer { private long timeout; @@ -165,4 +169,4 @@ public final class DelayedUtil { this.timeout = timeout; } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/1e82848a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java index 408cffd..78daf5a 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java @@ -42,7 +42,7 @@ public class TestProcedureToString { */ static class BasicProcedure extends Procedure { @Override - protected Procedure[] execute(BasicProcedureEnv env) + protected Procedure[] execute(BasicProcedureEnv env) throws ProcedureYieldException, InterruptedException { return new Procedure [] {this}; } @@ -78,8 +78,6 @@ public class TestProcedureToString { } } - - /** * Test that I can override the toString for its state value. * @throws ProcedureYieldException