Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 15585200CCF for ; Mon, 24 Jul 2017 21:08:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 13E0A165217; Mon, 24 Jul 2017 19:08:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BA21F163670 for ; Mon, 24 Jul 2017 21:08:11 +0200 (CEST) Received: (qmail 21845 invoked by uid 500); 24 Jul 2017 19:08:06 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 19923 invoked by uid 99); 24 Jul 2017 19:08:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Jul 2017 19:08:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7897CE965C; Mon, 24 Jul 2017 19:08:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Mon, 24 Jul 2017 19:08:19 -0000 Message-Id: <5b137403cc5a497b91b2bf307c8dafed@git.apache.org> In-Reply-To: <73c89cd8b4b0477088a5e6701bc60f7c@git.apache.org> References: <73c89cd8b4b0477088a5e6701bc60f7c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Mon, 24 Jul 2017 19:08:14 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/2d5075d7/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.CompletedProcedureRetainer.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.CompletedProcedureRetainer.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.CompletedProcedureRetainer.html new file mode 100644 index 0000000..904b921 --- /dev/null +++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.CompletedProcedureRetainer.html @@ -0,0 +1,2074 @@ + + + +Source code + + + +
+
001/**
+002 * Licensed to the Apache Software Foundation (ASF) under one
+003 * or more contributor license agreements.  See the NOTICE file
+004 * distributed with this work for additional information
+005 * regarding copyright ownership.  The ASF licenses this file
+006 * to you under the Apache License, Version 2.0 (the
+007 * "License"); you may not use this file except in compliance
+008 * with the License.  You may obtain a copy of the License at
+009 *
+010 *     http://www.apache.org/licenses/LICENSE-2.0
+011 *
+012 * Unless required by applicable law or agreed to in writing, software
+013 * distributed under the License is distributed on an "AS IS" BASIS,
+014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+015 * See the License for the specific language governing permissions and
+016 * limitations under the License.
+017 */
+018
+019package org.apache.hadoop.hbase.procedure2;
+020
+021import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+022import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+023
+024import java.io.IOException;
+025import java.io.InputStream;
+026import java.io.OutputStream;
+027import java.util.ArrayList;
+028import java.util.Arrays;
+029import java.util.HashSet;
+030import java.util.Iterator;
+031import java.util.List;
+032import java.util.Map;
+033import java.util.Objects;
+034import java.util.Set;
+035import java.util.concurrent.atomic.AtomicBoolean;
+036import java.util.concurrent.atomic.AtomicInteger;
+037import java.util.concurrent.atomic.AtomicLong;
+038import java.util.stream.Collectors;
+039import java.util.stream.Stream;
+040import java.util.concurrent.ConcurrentHashMap;
+041import java.util.concurrent.CopyOnWriteArrayList;
+042import java.util.concurrent.DelayQueue;
+043import java.util.concurrent.TimeUnit;
+044
+045import org.apache.commons.logging.Log;
+046import org.apache.commons.logging.LogFactory;
+047import org.apache.hadoop.conf.Configuration;
+048import org.apache.hadoop.hbase.HConstants;
+049import org.apache.hadoop.hbase.classification.InterfaceAudience;
+050import org.apache.hadoop.hbase.classification.InterfaceStability;
+051import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
+052import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
+053import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+054import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+055import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
+056import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
+057import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+058import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+059import org.apache.hadoop.hbase.security.User;
+060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+061import org.apache.hadoop.hbase.util.NonceKey;
+062import org.apache.hadoop.hbase.util.Threads;
+063
+064/**
+065 * Thread Pool that executes the submitted procedures.
+066 * The executor has a ProcedureStore associated.
+067 * Each operation is logged and on restart the pending procedures are resumed.
+068 *
+069 * Unless the Procedure code throws an error (e.g. invalid user input)
+070 * the procedure will complete (at some point in time), On restart the pending
+071 * procedures are resumed and the once failed will be rolledback.
+072 *
+073 * The user can add procedures to the executor via submitProcedure(proc)
+074 * check for the finished state via isFinished(procId)
+075 * and get the result via getResult(procId)
+076 */
+077@InterfaceAudience.Private
+078@InterfaceStability.Evolving
+079public class ProcedureExecutor<TEnvironment> {
+080  private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class);
+081
+082  public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set";
+083  private static final boolean DEFAULT_CHECK_OWNER_SET = false;
+084
+085  public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
+086      "hbase.procedure.worker.keep.alive.time.msec";
+087  private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = Long.MAX_VALUE;
+088
+089  Testing testing = null;
+090  public static class Testing {
+091    protected boolean killIfSuspended = false;
+092    protected boolean killBeforeStoreUpdate = false;
+093    protected boolean toggleKillBeforeStoreUpdate = false;
+094
+095    protected boolean shouldKillBeforeStoreUpdate() {
+096      final boolean kill = this.killBeforeStoreUpdate;
+097      if (this.toggleKillBeforeStoreUpdate) {
+098        this.killBeforeStoreUpdate = !kill;
+099        LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
+100      }
+101      return kill;
+102    }
+103
+104    protected boolean shouldKillBeforeStoreUpdate(final boolean isSuspended) {
+105      return (isSuspended && !killIfSuspended) ? false : shouldKillBeforeStoreUpdate();
+106    }
+107  }
+108
+109  public interface ProcedureExecutorListener {
+110    void procedureLoaded(long procId);
+111    void procedureAdded(long procId);
+112    void procedureFinished(long procId);
+113  }
+114
+115  private static class CompletedProcedureRetainer {
+116    private final Procedure<?> procedure;
+117    private long clientAckTime;
+118
+119    public CompletedProcedureRetainer(Procedure<?> procedure) {
+120      this.procedure = procedure;
+121      clientAckTime = -1;
+122    }
+123
+124    public Procedure<?> getProcedure() {
+125      return procedure;
+126    }
+127
+128    public boolean hasClientAckTime() {
+129      return clientAckTime != -1;
+130    }
+131
+132    public long getClientAckTime() {
+133      return clientAckTime;
+134    }
+135
+136    public void setClientAckTime(long clientAckTime) {
+137      this.clientAckTime = clientAckTime;
+138    }
+139
+140    public boolean isExpired(long now, long evictTtl, long evictAckTtl) {
+141      return (hasClientAckTime() && (now - getClientAckTime()) >= evictAckTtl) ||
+142        (now - procedure.getLastUpdate()) >= evictTtl;
+143    }
+144  }
+145
+146  /**
+147   * Internal cleaner that removes the completed procedure results after a TTL.
+148   * NOTE: This is a special case handled in timeoutLoop().
+149   *
+150   * <p>Since the client code looks more or less like:
+151   * <pre>
+152   *   procId = master.doOperation()
+153   *   while (master.getProcResult(procId) == ProcInProgress);
+154   * </pre>
+155   * The master should not throw away the proc result as soon as the procedure is done
+156   * but should wait a result request from the client (see executor.removeResult(procId))
+157   * The client will call something like master.isProcDone() or master.getProcResult()
+158   * which will return the result/state to the client, and it will mark the completed
+159   * proc as ready to delete. note that the client may not receive the response from
+160   * the master (e.g. master failover) so, if we delay a bit the real deletion of
+161   * the proc result the client will be able to get the result the next try.
+162   */
+163  private static class CompletedProcedureCleaner<TEnvironment>
+164      extends ProcedureInMemoryChore<TEnvironment> {
+165    private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
+166
+167    private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
+168    private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
+169
+170    private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
+171    private static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
+172
+173    private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
+174    private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
+175
+176    private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
+177    private static final int DEFAULT_BATCH_SIZE = 32;
+178
+179    private final Map<Long, CompletedProcedureRetainer> completed;
+180    private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
+181    private final ProcedureStore store;
+182    private Configuration conf;
+183
+184    public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
+185        final Map<Long, CompletedProcedureRetainer> completedMap,
+186        final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
+187      // set the timeout interval that triggers the periodic-procedure
+188      super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
+189      this.completed = completedMap;
+190      this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
+191      this.store = store;
+192      this.conf = conf;
+193    }
+194
+195    @Override
+196    protected void periodicExecute(final TEnvironment env) {
+197      if (completed.isEmpty()) {
+198        if (LOG.isTraceEnabled()) {
+199          LOG.trace("No completed procedures to cleanup.");
+200        }
+201        return;
+202      }
+203
+204      final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
+205      final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
+206      final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
+207
+208      final long[] batchIds = new long[batchSize];
+209      int batchCount = 0;
+210
+211      final long now = EnvironmentEdgeManager.currentTime();
+212      final Iterator<Map.Entry<Long, CompletedProcedureRetainer>> it = completed.entrySet().iterator();
+213      final boolean debugEnabled = LOG.isDebugEnabled();
+214      while (it.hasNext() && store.isRunning()) {
+215        final Map.Entry<Long, CompletedProcedureRetainer> entry = it.next();
+216        final CompletedProcedureRetainer retainer = entry.getValue();
+217        final Procedure<?> proc = retainer.getProcedure();
+218
+219        // TODO: Select TTL based on Procedure type
+220        if (retainer.isExpired(now, evictTtl, evictAckTtl)) {
+221          if (debugEnabled) {
+222            LOG.debug("Evict completed " + proc);
+223          }
+224          batchIds[batchCount++] = entry.getKey();
+225          if (batchCount == batchIds.length) {
+226            store.delete(batchIds, 0, batchCount);
+227            batchCount = 0;
+228          }
+229          it.remove();
+230
+231          final NonceKey nonceKey = proc.getNonceKey();
+232          if (nonceKey != null) {
+233            nonceKeysToProcIdsMap.remove(nonceKey);
+234          }
+235        }
+236      }
+237      if (batchCount > 0) {
+238        store.delete(batchIds, 0, batchCount);
+239      }
+240    }
+241  }
+242
+243  /**
+244   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the ProcedureInfo.
+245   * Once a Root-Procedure completes (success or failure), the result will be added to this map.
+246   * The user of ProcedureExecutor should call getResult(procId) to get the result.
+247   */
+248  private final ConcurrentHashMap<Long, CompletedProcedureRetainer> completed = new ConcurrentHashMap<>();
+249
+250  /**
+251   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
+252   * The RootProcedureState contains the execution stack of the Root-Procedure,
+253   * It is added to the map by submitProcedure() and removed on procedure completion.
+254   */
+255  private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack = new ConcurrentHashMap<>();
+256
+257  /**
+258   * Helper map to lookup the live procedures by ID.
+259   * This map contains every procedure. root-procedures and subprocedures.
+260   */
+261  private final ConcurrentHashMap<Long, Procedure> procedures = new ConcurrentHashMap<>();
+262
+263  /**
+264   * Helper map to lookup whether the procedure already issued from the same client.
+265   * This map contains every root procedure.
+266   */
+267  private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
+268
+269  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = new CopyOnWriteArrayList<>();
+270
+271  private Configuration conf;
+272  private ThreadGroup threadGroup;
+273  private CopyOnWriteArrayList<WorkerThread> workerThreads;
+274  private TimeoutExecutorThread timeoutExecutor;
+275  private int corePoolSize;
+276
+277  private volatile long keepAliveTime = Long.MAX_VALUE;
+278
+279  /**
+280   * Scheduler/Queue that contains runnable procedures.
+281   */
+282  private final ProcedureScheduler scheduler;
+283
+284  private final AtomicLong lastProcId = new AtomicLong(-1);
+285  private final AtomicLong workerId = new AtomicLong(0);
+286  private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
+287  private final AtomicBoolean running = new AtomicBoolean(false);
+288  private final TEnvironment environment;
+289  private final ProcedureStore store;
+290
+291  private final boolean checkOwnerSet;
+292
+293  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
+294      final ProcedureStore store) {
+295    this(conf, environment, store, new SimpleProcedureScheduler());
+296  }
+297
+298  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
+299      final ProcedureStore store, final ProcedureScheduler scheduler) {
+300    this.environment = environment;
+301    this.scheduler = scheduler;
+302    this.store = store;
+303    this.conf = conf;
+304    this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
+305    refreshConfiguration(conf);
+306  }
+307
+308  private void load(final boolean abortOnCorruption) throws IOException {
+309    Preconditions.checkArgument(completed.isEmpty(), "completed not empty");
+310    Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty");
+311    Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty");
+312    Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty");
+313
+314    store.load(new ProcedureStore.ProcedureLoader() {
+315      @Override
+316      public void setMaxProcId(long maxProcId) {
+317        assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
+318        LOG.debug("Load maxProcId=" + maxProcId);
+319        lastProcId.set(maxProcId);
+320      }
+321
+322      @Override
+323      public void load(ProcedureIterator procIter) throws IOException {
+324        loadProcedures(procIter, abortOnCorruption);
+325      }
+326
+327      @Override
+328      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+329        int corruptedCount = 0;
+330        while (procIter.hasNext()) {
+331          Procedure<?> proc = procIter.next();
+332          LOG.error("Corrupt " + proc);
+333          corruptedCount++;
+334        }
+335        if (abortOnCorruption && corruptedCount > 0) {
+336          throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");
+337        }
+338      }
+339    });
+340  }
+341
+342  private void loadProcedures(final ProcedureIterator procIter,
+343      final boolean abortOnCorruption) throws IOException {
+344    final boolean debugEnabled = LOG.isDebugEnabled();
+345
+346    // 1. Build the rollback stack
+347    int runnablesCount = 0;
+348    while (procIter.hasNext()) {
+349      boolean finished = procIter.isNextFinished();
+350      Procedure proc = procIter.next();
+351      NonceKey nonceKey = proc.getNonceKey();
+352      long procId = proc.getProcId();
+353
+354      if (finished) {
+355        completed.put(proc.getProcId(), new CompletedProcedureRetainer(proc));
+356        if (debugEnabled) {
+357          LOG.debug("Completed " + proc);
+358        }
+359      } else {
+360        if (!proc.hasParent()) {
+361          assert !proc.isFinished() : "unexpected finished procedure";
+362          rollbackStack.put(proc.getProcId(), new RootProcedureState());
+363        }
+364
+365        // add the procedure to the map
+366        proc.beforeReplay(getEnvironment());
+367        procedures.put(proc.getProcId(), proc);
+368
+369        if (proc.getState() == ProcedureState.RUNNABLE) {
+370          runnablesCount++;
+371        }
+372      }
+373
+374      // add the nonce to the map
+375      if (nonceKey != null) {
+376        nonceKeysToProcIdsMap.put(nonceKey, procId);
+377      }
+378    }
+379
+380    // 2. Initialize the stacks
+381    final ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
+382    HashSet<Procedure> waitingSet = null;
+383    procIter.reset();
+384    while (procIter.hasNext()) {
+385      if (procIter.isNextFinished()) {
+386        procIter.skipNext();
+387        continue;
+388      }
+389
+390      Procedure proc = procIter.next();
+391      assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
+392
+393      if (debugEnabled) {
+394        LOG.debug(String.format("Loading %s", proc));
+395      }
+396
+397      Long rootProcId = getRootProcedureId(proc);
+398      if (rootProcId == null) {
+399        // The 'proc' was ready to run but the root procedure was rolledback?
+400        scheduler.addBack(proc);
+401        continue;
+402      }
+403
+404      if (proc.hasParent()) {
+405        Procedure parent = procedures.get(proc.getParentProcId());
+406        // corrupted procedures are handled later at step 3
+407        if (parent != null && !proc.isFinished()) {
+408          parent.incChildrenLatch();
+409        }
+410      }
+411
+412      RootProcedureState procStack = rollbackStack.get(rootProcId);
+413      procStack.loadStack(proc);
+414
+415      proc.setRootProcId(rootProcId);
+416      switch (proc.getState()) {
+417        case RUNNABLE:
+418          runnableList.add(proc);
+419          break;
+420        case WAITING:
+421          if (!proc.hasChildren()) {
+422            runnableList.add(proc);
+423          }
+424          break;
+425        case WAITING_TIMEOUT:
+426          if (waitingSet == null) {
+427            waitingSet = new HashSet<>();
+428          }
+429          waitingSet.add(proc);
+430          break;
+431        case FAILED:
+432          // add the proc to the scheduler to perform the rollback
+433          scheduler.addBack(proc);
+434          break;
+435        case ROLLEDBACK:
+436        case INITIALIZING:
+437          String msg = "Unexpected " + proc.getState() + " state for " + proc;
+438          LOG.error(msg);
+439          throw new UnsupportedOperationException(msg);
+440        default:
+441          break;
+442      }
+443    }
+444
+445    // 3. Validate the stacks
+446    int corruptedCount = 0;
+447    Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
+448    while (itStack.hasNext()) {
+449      Map.Entry<Long, RootProcedureState> entry = itStack.next();
+450      RootProcedureState procStack = entry.getValue();
+451      if (procStack.isValid()) continue;
+452
+453      for (Procedure proc: procStack.getSubproceduresStack()) {
+454        LOG.error("Corrupted " + proc);
+455        procedures.remove(proc.getProcId());
+456        runnableList.remove(proc);
+457        if (waitingSet != null) waitingSet.remove(proc);
+458        corruptedCount++;
+459      }
+460      itStack.remove();
+461    }
+462
+463    if (abortOnCorruption && corruptedCount > 0) {
+464      throw new IOException("found " + corruptedCount + " procedures on replay");
+465    }
+466
+467    // 4. Push the procedures to the timeout executor
+468    if (waitingSet != null && !waitingSet.isEmpty()) {
+469      for (Procedure proc: waitingSet) {
+470        proc.afterReplay(getEnvironment());
+471        timeoutExecutor.add(proc);
+472      }
+473    }
+474
+475    // 5. Push the procedure to the scheduler
+476    if (!runnableList.isEmpty()) {
+477      // TODO: See ProcedureWALFormatReader#hasFastStartSupport
+478      // some procedure may be started way before this stuff.
+479      for (int i = runnableList.size() - 1; i >= 0; --i) {
+480        Procedure proc = runnableList.get(i);
+481        proc.afterReplay(getEnvironment());
+482        if (!proc.hasParent()) {
+483          sendProcedureLoadedNotification(proc.getProcId());
+484        }
+485        if (proc.wasExecuted()) {
+486          scheduler.addFront(proc);
+487        } else {
+488          // if it was not in execution, it can wait.
+489          scheduler.addBack(proc);
+490        }
+491      }
+492    }
+493  }
+494
+495  /**
+496   * Start the procedure executor.
+497   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to
+498   * recover the lease, and ensure a single executor, and start the procedure
+499   * replay to resume and recover the previous pending and in-progress perocedures.
+500   *
+501   * @param numThreads number of threads available for procedure execution.
+502   * @param abortOnCorruption true if you want to abort your service in case
+503   *          a corrupted procedure is found on replay. otherwise false.
+504   */
+505  public void start(int numThreads, boolean abortOnCorruption) throws IOException {
+506    if (running.getAndSet(true)) {
+507      LOG.warn("Already running");
+508      return;
+509    }
+510
+511    // We have numThreads executor + one timer thread used for timing out
+512    // procedures and triggering periodic procedures.
+513    this.corePoolSize = numThreads;
+514    LOG.info("Starting ProcedureExecutor Worker threads (ProcExecWrkr)=" + corePoolSize);
+515
+516    // Create the Thread Group for the executors
+517    threadGroup = new ThreadGroup("ProcExecThrdGrp");
+518
+519    // Create the timeout executor
+520    timeoutExecutor = new TimeoutExecutorThread(threadGroup);
+521
+522    // Create the workers
+523    workerId.set(0);
+524    workerThreads = new CopyOnWriteArrayList<>();
+525    for (int i = 0; i < corePoolSize; ++i) {
+526      workerThreads.add(new WorkerThread(threadGroup));
+527    }
+528
+529    long st, et;
+530
+531    // Acquire the store lease.
+532    st = EnvironmentEdgeManager.currentTime();
+533    store.recoverLease();
+534    et = EnvironmentEdgeManager.currentTime();
+535    LOG.info(String.format("Recover store (%s) lease: %s",
+536      store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
+537
+538    // start the procedure scheduler
+539    scheduler.start();
+540
+541    // TODO: Split in two steps.
+542    // TODO: Handle corrupted procedures (currently just a warn)
+543    // The first one will make sure that we have the latest id,
+544    // so we can start the threads and accept new procedures.
+545    // The second step will do the actual load of old procedures.
+546    st = EnvironmentEdgeManager.currentTime();
+547    load(abortOnCorruption);
+548    et = EnvironmentEdgeManager.currentTime();
+549    LOG.info(String.format("Load store (%s): %s",
+550      store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
+551
+552    // Start the executors. Here we must have the lastProcId set.
+553    if (LOG.isTraceEnabled()) {
+554      LOG.trace("Start workers " + workerThreads.size());
+555    }
+556    timeoutExecutor.start();
+557    for (WorkerThread worker: workerThreads) {
+558      worker.start();
+559    }
+560
+561    // Internal chores
+562    timeoutExecutor.add(new WorkerMonitor());
+563
+564    // Add completed cleaner chore
+565    addChore(new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
+566  }
+567
+568  public void stop() {
+569    if (!running.getAndSet(false)) {
+570      return;
+571    }
+572
+573    LOG.info("Stopping");
+574    scheduler.stop();
+575    timeoutExecutor.sendStopSignal();
+576  }
+577
+578  @VisibleForTesting
+579  public void join() {
+580    assert !isRunning() : "expected not running";
+581
+582    // stop the timeout executor
+583    timeoutExecutor.awaitTermination();
+584    timeoutExecutor = null;
+585
+586    // stop the worker threads
+587    for (WorkerThread worker: workerThreads) {
+588      worker.awaitTermination();
+589    }
+590    workerThreads = null;
+591
+592    // Destroy the Thread Group for the executors
+593    try {
+594      threadGroup.destroy();
+595    } catch (IllegalThreadStateException e) {
+596      LOG.error("Thread group " + threadGroup + " contains running threads");
+597      threadGroup.list();
+598    } finally {
+599      threadGroup = null;
+600    }
+601
+602    // reset the in-memory state for testing
+603    completed.clear();
+604    rollbackStack.clear();
+605    procedures.clear();
+606    nonceKeysToProcIdsMap.clear();
+607    scheduler.clear();
+608    lastProcId.set(-1);
+609  }
+610
+611  public void refreshConfiguration(final Configuration conf) {
+612    this.conf = conf;
+613    setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY,
+614        DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS);
+615  }
+616
+617  // ==========================================================================
+618  //  Accessors
+619  // ==========================================================================
+620  public boolean isRunning() {
+621    return running.get();
+622  }
+623
+624  /**
+625   * @return the current number of worker threads.
+626   */
+627  public int getWorkerThreadCount() {
+628    return workerThreads.size();
+629  }
+630
+631  /**
+632   * @return the core pool size settings.
+633   */
+634  public int getCorePoolSize() {
+635    return corePoolSize;
+636  }
+637
+638  public int getActiveExecutorCount() {
+639    return activeExecutorCount.get();
+640  }
+641
+642  public TEnvironment getEnvironment() {
+643    return this.environment;
+644  }
+645
+646  public ProcedureStore getStore() {
+647    return this.store;
+648  }
+649
+650  protected ProcedureScheduler getScheduler() {
+651    return scheduler;
+652  }
+653
+654  public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) {
+655    this.keepAliveTime = timeUnit.toMillis(keepAliveTime);
+656    this.scheduler.signalAll();
+657  }
+658
+659  public long getKeepAliveTime(final TimeUnit timeUnit) {
+660    return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
+661  }
+662
+663  // ==========================================================================
+664  //  Submit/Remove Chores
+665  // ==========================================================================
+666
+667  /**
+668   * Add a chore procedure to the executor
+669   * @param chore the chore to add
+670   */
+671  public void addChore(final ProcedureInMemoryChore chore) {
+672    chore.setState(ProcedureState.WAITING_TIMEOUT);
+673    timeoutExecutor.add(chore);
+674  }
+675
+676  /**
+677   * Remove a chore procedure from the executor
+678   * @param chore the chore to remove
+679   * @return whether the chore is removed, or it will be removed later
+680   */
+681  public boolean removeChore(final ProcedureInMemoryChore chore) {
+682    chore.setState(ProcedureState.SUCCESS);
+683    return timeoutExecutor.remove(chore);
+684  }
+685
+686  // ==========================================================================
+687  //  Nonce Procedure helpers
+688  // ==========================================================================
+689  /**
+690   * Create a NoneKey from the specified nonceGroup and nonce.
+691   * @param nonceGroup
+692   * @param nonce
+693   * @return the generated NonceKey
+694   */
+695  public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
+696    return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
+697  }
+698
+699  /**
+700   * Register a nonce for a procedure that is going to be submitted.
+701   * A procId will be reserved and on submitProcedure(),
+702   * the procedure with the specified nonce will take the reserved ProcId.
+703   * If someone already reserved the nonce, this method will return the procId reserved,
+704   * otherwise an invalid procId will be returned. and the caller should procede
+705   * and submit the procedure.
+706   *
+707   * @param nonceKey A unique identifier for this operation from the client or process.
+708   * @return the procId associated with the nonce, if any otherwise an invalid procId.
+709   */
+710  public long registerNonce(final NonceKey nonceKey) {
+711    if (nonceKey == null) return -1;
+712
+713    // check if we have already a Reserved ID for the nonce
+714    Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
+715    if (oldProcId == null) {
+716      // reserve a new Procedure ID, this will be associated with the nonce
+717      // and the procedure submitted with the specified nonce will use this ID.
+718      final long newProcId = nextProcId();
+719      oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
+720      if (oldProcId == null) return -1;
+721    }
+722
+723    // we found a registered nonce, but the procedure may not have been submitted yet.
+724    // since the client expect the procedure to be submitted, spin here until it is.
+725    final boolean traceEnabled = LOG.isTraceEnabled();
+726    while (isRunning() &&
+727           !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
+728           nonceKeysToProcIdsMap.containsKey(nonceKey)) {
+729      if (traceEnabled) {
+730        LOG.trace("Waiting for procId=" + oldProcId.longValue() + " to be submitted");
+731      }
+732      Threads.sleep(100);
+733    }
+734    return oldProcId.longValue();
+735  }
+736
+737  /**
+738   * Remove the NonceKey if the procedure was not submitted to the executor.
+739   * @param nonceKey A unique identifier for this operation from the client or process.
+740   */
+741  public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
+742    if (nonceKey == null) return;
+743
+744    final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
+745    if (procId == null) return;
+746
+747    // if the procedure was not submitted, remove the nonce
+748    if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
+749      nonceKeysToProcIdsMap.remove(nonceKey);
+750    }
+751  }
+752
+753  private static class FailedProcedure<TEnvironment> extends Procedure<TEnvironment> {
+754    private String procName;
+755
+756    public FailedProcedure(NonceKey nonceKey, String procName, User owner,
+757        IOException exception) {
+758      this.procName = procName;
+759      setNonceKey(nonceKey);
+760      setOwner(owner);
+761      setFailure(Objects.toString(exception.getMessage(), ""), exception);
+762    }
+763
+764    @Override
+765    public String getProcName() {
+766      return procName;
+767    }
+768
+769    @Override
+770    protected Procedure<TEnvironment>[] execute(TEnvironment env)
+771        throws ProcedureYieldException, ProcedureSuspendedException,
+772        InterruptedException {
+773      throw new UnsupportedOperationException();
+774    }
+775
+776    @Override
+777    protected void rollback(TEnvironment env)
+778        throws IOException, InterruptedException {
+779      throw new UnsupportedOperationException();
+780    }
+781
+782    @Override
+783    protected boolean abort(TEnvironment env) {
+784      throw new UnsupportedOperationException();
+785    }
+786
+787    @Override
+788    protected void serializeStateData(OutputStream stream) throws IOException {
+789    }
+790
+791    @Override
+792    protected void deserializeStateData(InputStream stream) throws IOException {
+793    }
+794  }
+795
+796  /**
+797   * If the failure failed before submitting it, we may want to give back the
+798   * same error to the requests with the same nonceKey.
+799   *
+800   * @param nonceKey A unique identifier for this operation from the client or process
+801   * @param procName name of the procedure, used to inform the user
+802   * @param procOwner name of the owner of the procedure, used to inform the user
+803   * @param exception the failure to report to the user
+804   */
+805  public void setFailureResultForNonce(final NonceKey nonceKey, final String procName,
+806      final User procOwner, final IOException exception) {
+807    if (nonceKey == null) return;
+808
+809    final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
+810    if (procId == null || completed.containsKey(procId)) return;
+811
+812    Procedure proc = new FailedProcedure(nonceKey, procName, procOwner, exception);
+813    completed.putIfAbsent(procId, new CompletedProcedureRetainer(proc));
+814  }
+815
+816  // ==========================================================================
+817  //  Submit/Abort Procedure
+818  // ==========================================================================
+819  /**
+820   * Add a new root-procedure to the executor.
+821   * @param proc the new procedure to execute.
+822   * @return the procedure id, that can be used to monitor the operation
+823   */
+824  public long submitProcedure(final Procedure proc) {
+825    return submitProcedure(proc, null);
+826  }
+827
+828  /**
+829   * Add a new root-procedure to the executor.
+830   * @param proc the new procedure to execute.
+831   * @param nonceKey the registered unique identifier for this operation from the client or process.
+832   * @return the procedure id, that can be used to monitor the operation
+833   */
+834  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
+835      justification = "FindBugs is blind to the check-for-null")
+836  public long submitProcedure(final Procedure proc, final NonceKey nonceKey) {
+837    Preconditions.checkArgument(lastProcId.get() >= 0);
+838    Preconditions.checkArgument(isRunning(), "executor not running");
+839
+840    prepareProcedure(proc);
+841
+842    final Long currentProcId;
+843    if (nonceKey != null) {
+844      currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
+845      Preconditions.checkArgument(currentProcId != null,
+846        "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc);
+847    } else {
+848      currentProcId = nextProcId();
+849    }
+850
+851    // Initialize the procedure
+852    proc.setNonceKey(nonceKey);
+853    proc.setProcId(currentProcId.longValue());
+854
+855    // Commit the transaction
+856    store.insert(proc, null);
+857    if (LOG.isDebugEnabled()) {
+858      LOG.debug("Stored " + proc);
+859    }
+860
+861    // Add the procedure to the executor
+862    return pushProcedure(proc);
+863  }
+864
+865  /**
+866   * Add a set of new root-procedure to the executor.
+867   * @param procs the new procedures to execute.
+868   */
+869  // TODO: Do we need to take nonces here?
+870  public void submitProcedures(final Procedure[] procs) {
+871    Preconditions.checkArgument(lastProcId.get() >= 0);
+872    Preconditions.checkArgument(isRunning(), "executor not running");
+873
+874    // Prepare procedure
+875    for (int i = 0; i < procs.length; ++i) {
+876      prepareProcedure(procs[i]).setProcId(nextProcId());
+877    }
+878
+879    // Commit the transaction
+880    store.insert(procs);
+881    if (LOG.isDebugEnabled()) {
+882      LOG.debug("Stored " + Arrays.toString(procs));
+883    }
+884
+885    // Add the procedure to the executor
+886    for (int i = 0; i < procs.length; ++i) {
+887      pushProcedure(procs[i]);
+888    }
+889  }
+890
+891  private Procedure prepareProcedure(final Procedure proc) {
+892    Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
+893    Preconditions.checkArgument(isRunning(), "executor not running");
+894    Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
+895    if (this.checkOwnerSet) {
+896      Preconditions.checkArgument(proc.hasOwner(), "missing owner");
+897    }
+898    return proc;
+899  }
+900
+901  private long pushProcedure(final Procedure proc) {
+902    final long currentProcId = proc.getProcId();
+903
+904    // Update metrics on start of a procedure
+905    proc.updateMetricsOnSubmit(getEnvironment());
+906
+907    // Create the rollback stack for the procedure
+908    RootProcedureState stack = new RootProcedureState();
+909    rollbackStack.put(currentProcId, stack);
+910