hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [26/59] [abbrv] hbase git commit: Revert "HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi)" Revert a mistaken commit!!!
Date Fri, 26 May 2017 23:47:34 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index b648cf2..1bb6118 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -32,8 +32,6 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.DelayQueue;
@@ -115,11 +113,9 @@ public class ProcedureExecutor<TEnvironment> {
    * Internal cleaner that removes the completed procedure results after a TTL.
    * NOTE: This is a special case handled in timeoutLoop().
    *
-   * <p>Since the client code looks more or less like:
-   * <pre>
+   * Since the client code looks more or less like:
    *   procId = master.doOperation()
    *   while (master.getProcResult(procId) == ProcInProgress);
-   * </pre>
    * The master should not throw away the proc result as soon as the procedure is done
    * but should wait a result request from the client (see executor.removeResult(procId))
    * The client will call something like master.isProcDone() or master.getProcResult()
@@ -484,10 +480,10 @@ public class ProcedureExecutor<TEnvironment> {
     // We have numThreads executor + one timer thread used for timing out
     // procedures and triggering periodic procedures.
     this.corePoolSize = numThreads;
-    LOG.info("Starting ProcedureExecutor Worker threads (ProcExecWrkr)=" + corePoolSize);
+    LOG.info("Starting executor worker threads=" + corePoolSize);
 
     // Create the Thread Group for the executors
-    threadGroup = new ThreadGroup("ProcExecThrdGrp");
+    threadGroup = new ThreadGroup("ProcedureExecutor");
 
     // Create the timeout executor
     timeoutExecutor = new TimeoutExecutorThread(threadGroup);
@@ -1081,16 +1077,13 @@ public class ProcedureExecutor<TEnvironment> {
     final Long rootProcId = getRootProcedureId(proc);
     if (rootProcId == null) {
       // The 'proc' was ready to run but the root procedure was rolledback
-      LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
       executeRollback(proc);
       return;
     }
 
     final RootProcedureState procStack = rollbackStack.get(rootProcId);
-    if (procStack == null) {
-      LOG.warn("RootProcedureState is null for " + proc.getProcId());
-      return;
-    }
+    if (procStack == null) return;
+
     do {
       // Try to acquire the execution
       if (!procStack.acquire(proc)) {
@@ -1104,7 +1097,6 @@ public class ProcedureExecutor<TEnvironment> {
               scheduler.yield(proc);
               break;
             case LOCK_EVENT_WAIT:
-              LOG.info("LOCK_EVENT_WAIT rollback..." + proc);
               procStack.unsetRollback();
               break;
             default:
@@ -1122,7 +1114,6 @@ public class ProcedureExecutor<TEnvironment> {
                 scheduler.yield(proc);
                 break;
               case LOCK_EVENT_WAIT:
-                LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc);
                 break;
               default:
                 throw new UnsupportedOperationException();
@@ -1134,21 +1125,16 @@ public class ProcedureExecutor<TEnvironment> {
 
       // Execute the procedure
       assert proc.getState() == ProcedureState.RUNNABLE : proc;
-      // Note that lock is NOT about concurrency but rather about ensuring
-      // ownership of a procedure of an entity such as a region or table
-      LockState lockState = acquireLock(proc);
-      switch (lockState) {
+      switch (acquireLock(proc)) {
         case LOCK_ACQUIRED:
           execProcedure(procStack, proc);
           releaseLock(proc, false);
           break;
         case LOCK_YIELD_WAIT:
-          LOG.info(lockState + " " + proc);
           scheduler.yield(proc);
           break;
         case LOCK_EVENT_WAIT:
-          // Someone will wake us up when the lock is available
-          LOG.debug(lockState + " " + proc);
+          // someone will wake us up when the lock is available
           break;
         default:
           throw new UnsupportedOperationException();
@@ -1164,7 +1150,10 @@ public class ProcedureExecutor<TEnvironment> {
       if (proc.isSuccess()) {
         // update metrics on finishing the procedure
         proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
-        LOG.info("Finish " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
+        }
         // Finalize the procedure state
         if (proc.getProcId() == rootProcId) {
           procedureFinished(proc);
@@ -1189,7 +1178,7 @@ public class ProcedureExecutor<TEnvironment> {
 
   private void releaseLock(final Procedure proc, final boolean force) {
     final TEnvironment env = getEnvironment();
-    // For how the framework works, we know that we will always have the lock
+    // for how the framework works, we know that we will always have the lock
     // when we call releaseLock(), so we can avoid calling proc.hasLock()
     if (force || !proc.holdLock(env)) {
       proc.doReleaseLock(env);
@@ -1204,8 +1193,6 @@ public class ProcedureExecutor<TEnvironment> {
   private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) {
     final Procedure rootProc = procedures.get(rootProcId);
     RemoteProcedureException exception = rootProc.getException();
-    // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
-    // rolling back because the subprocedure does. Clarify.
     if (exception == null) {
       exception = procStack.getException();
       rootProc.setFailure(exception);
@@ -1282,7 +1269,7 @@ public class ProcedureExecutor<TEnvironment> {
       return LockState.LOCK_YIELD_WAIT;
     } catch (Throwable e) {
       // Catch NullPointerExceptions or similar errors...
-      LOG.fatal("CODE-BUG: Uncaught runtime exception fo " + proc, e);
+      LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
     }
 
     // allows to kill the executor before something is stored to the wal.
@@ -1318,55 +1305,29 @@ public class ProcedureExecutor<TEnvironment> {
   }
 
   /**
-   * Executes <code>procedure</code>
-   * <ul>
-   *  <li>Calls the doExecute() of the procedure
-   *  <li>If the procedure execution didn't fail (i.e. valid user input)
-   *  <ul>
-   *    <li>...and returned subprocedures
-   *    <ul><li>The subprocedures are initialized.
-   *      <li>The subprocedures are added to the store
-   *      <li>The subprocedures are added to the runnable queue
-   *      <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
-   *    </ul>
-   *    </li>
-   *   <li>...if there are no subprocedure
-   *    <ul><li>the procedure completed successfully
-   *      <li>if there is a parent (WAITING)
-   *      <li>the parent state will be set to RUNNABLE
-   *    </ul>
-   *   </li>
-   *  </ul>
-   *  </li>
-   *  <li>In case of failure
-   *  <ul>
-   *    <li>The store is updated with the new state</li>
-   *    <li>The executor (caller of this method) will start the rollback of the procedure</li>
-   *  </ul>
-   *  </li>
-   *  </ul>
+   * Executes the specified procedure
+   *  - calls the doExecute() of the procedure
+   *  - if the procedure execution didn't fail (e.g. invalid user input)
+   *     - ...and returned subprocedures
+   *        - the subprocedures are initialized.
+   *        - the subprocedures are added to the store
+   *        - the subprocedures are added to the runnable queue
+   *        - the procedure is now in a WAITING state, waiting for the subprocedures to complete
+   *     - ...if there are no subprocedure
+   *        - the procedure completed successfully
+   *        - if there is a parent (WAITING)
+   *            - the parent state will be set to RUNNABLE
+   *  - in case of failure
+   *    - the store is updated with the new state
+   *    - the executor (caller of this method) will start the rollback of the procedure
    */
-  private void execProcedure(final RootProcedureState procStack,
-      final Procedure<TEnvironment> procedure) {
+  private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
     Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
 
-    // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
-    // The exception is caught below and then we hurry to the exit without disturbing state. The
-    // idea is that the processing of this procedure will be unsuspended later by an external event
-    // such the report of a region open. TODO: Currently, its possible for two worker threads
-    // to be working on the same procedure concurrently (locking in procedures is NOT about
-    // concurrency but about tying an entity to a procedure; i.e. a region to a particular
-    // procedure instance). This can make for issues if both threads are changing state.
-    // See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
-    // in RegionTransitionProcedure#reportTransition for example of Procedure putting
-    // itself back on the scheduler making it possible for two threads running against
-    // the one Procedure. Might be ok if they are both doing different, idempotent sections.
+    // Execute the procedure
     boolean suspended = false;
-
-    // Whether to 're-' -execute; run through the loop again.
     boolean reExecute = false;
-
-    Procedure<TEnvironment>[] subprocs = null;
+    Procedure[] subprocs = null;
     do {
       reExecute = false;
       try {
@@ -1375,20 +1336,14 @@ public class ProcedureExecutor<TEnvironment> {
           subprocs = null;
         }
       } catch (ProcedureSuspendedException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Suspend " + procedure);
-        }
         suspended = true;
       } catch (ProcedureYieldException e) {
         if (LOG.isTraceEnabled()) {
-          LOG.trace("Yield " + procedure + ": " + e.getMessage(), e);
+          LOG.trace("Yield " + procedure + ": " + e.getMessage());
         }
         scheduler.yield(procedure);
         return;
       } catch (InterruptedException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Yield interrupt " + procedure + ": " + e.getMessage(), e);
-        }
         handleInterruptedException(procedure, e);
         scheduler.yield(procedure);
         return;
@@ -1402,26 +1357,14 @@ public class ProcedureExecutor<TEnvironment> {
       if (!procedure.isFailed()) {
         if (subprocs != null) {
           if (subprocs.length == 1 && subprocs[0] == procedure) {
-            // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
-            // i.e. we go around this loop again rather than go back out on the scheduler queue.
+            // quick-shortcut for a state machine like procedure
             subprocs = null;
             reExecute = true;
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Short-circuit to next step on pid=" + procedure.getProcId());
-            }
           } else {
-            // Yield the current procedure, and make the subprocedure runnable
-            // subprocs may come back 'null'.
+            // yield the current procedure, and make the subprocedure runnable
             subprocs = initializeChildren(procStack, procedure, subprocs);
-            LOG.info("Initialized subprocedures=" +
-              (subprocs == null? null:
-                Stream.of(subprocs).map(e -> "{" + e.toString() + "}").
-                collect(Collectors.toList()).toString()));
           }
         } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Added to timeoutExecutor " + procedure);
-          }
           timeoutExecutor.add(procedure);
         } else if (!suspended) {
           // No subtask, so we are done
@@ -1445,13 +1388,12 @@ public class ProcedureExecutor<TEnvironment> {
       // executor thread to stop. The statement following the method call below seems to check if
       // store is not running, to prevent scheduling children procedures, re-execution or yield
       // of this procedure. This may need more scrutiny and subsequent cleanup in future
-      //
-      // Commit the transaction even if a suspend (state may have changed). Note this append
-      // can take a bunch of time to complete.
+      // Commit the transaction
       updateStoreOnExec(procStack, procedure, subprocs);
 
       // if the store is not running we are aborting
       if (!store.isRunning()) return;
+
       // if the procedure is kind enough to pass the slot to someone else, yield
       if (procedure.isRunnable() && !suspended &&
           procedure.isYieldAfterExecutionStep(getEnvironment())) {
@@ -1461,14 +1403,14 @@ public class ProcedureExecutor<TEnvironment> {
 
       assert (reExecute && subprocs == null) || !reExecute;
     } while (reExecute);
+
     // Submit the new subprocedures
     if (subprocs != null && !procedure.isFailed()) {
       submitChildrenProcedures(subprocs);
     }
 
-    // if the procedure is complete and has a parent, count down the children latch.
-    // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
-    if (!suspended && procedure.isFinished() && procedure.hasParent()) {
+    // if the procedure is complete and has a parent, count down the children latch
+    if (procedure.isFinished() && procedure.hasParent()) {
       countDownChildren(procStack, procedure);
     }
   }
@@ -1527,13 +1469,18 @@ public class ProcedureExecutor<TEnvironment> {
     }
 
     // If this procedure is the last child awake the parent procedure
-    LOG.info("Finish suprocedure " + procedure);
-    if (parent.tryRunnable()) {
-      // If we succeeded in making the parent runnable -- i.e. all of its
-      // children have completed, move parent to front of the queue.
+    final boolean traceEnabled = LOG.isTraceEnabled();
+    if (traceEnabled) {
+      LOG.trace(parent + " child is done: " + procedure);
+    }
+
+    if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
+      parent.setState(ProcedureState.RUNNABLE);
       store.update(parent);
       scheduler.addFront(parent);
-      LOG.info("Finished subprocedure(s) of " + parent + "; resume parent processing.");
+      if (traceEnabled) {
+        LOG.trace(parent + " all the children finished their work, resume.");
+      }
       return;
     }
   }
@@ -1622,10 +1569,9 @@ public class ProcedureExecutor<TEnvironment> {
   // ==========================================================================
   private final class WorkerThread extends StoppableThread {
     private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
-    private Procedure activeProcedure;
 
     public WorkerThread(final ThreadGroup group) {
-      super(group, "ProcExecWrkr-" + workerId.incrementAndGet());
+      super(group, "ProcExecWorker-" + workerId.incrementAndGet());
     }
 
     @Override
@@ -1635,49 +1581,29 @@ public class ProcedureExecutor<TEnvironment> {
 
     @Override
     public void run() {
+      final boolean traceEnabled = LOG.isTraceEnabled();
       long lastUpdate = EnvironmentEdgeManager.currentTime();
-      try {
-        while (isRunning() && keepAlive(lastUpdate)) {
-          this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
-          if (this.activeProcedure == null) continue;
-          int activeCount = activeExecutorCount.incrementAndGet();
-          int runningCount = store.setRunningProcedureCount(activeCount);
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Execute pid=" + this.activeProcedure.getProcId() +
-                " runningCount=" + runningCount + ", activeCount=" + activeCount);
-          }
-          executionStartTime.set(EnvironmentEdgeManager.currentTime());
-          try {
-            executeProcedure(this.activeProcedure);
-          } catch (AssertionError e) {
-            LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e);
-            throw e;
-          } finally {
-            activeCount = activeExecutorCount.decrementAndGet();
-            runningCount = store.setRunningProcedureCount(activeCount);
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Halt pid=" + this.activeProcedure.getProcId() +
-                  " runningCount=" + runningCount + ", activeCount=" + activeCount);
-            }
-            this.activeProcedure = null;
-            lastUpdate = EnvironmentEdgeManager.currentTime();
-            executionStartTime.set(Long.MAX_VALUE);
+      while (isRunning() && keepAlive(lastUpdate)) {
+        final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
+        if (procedure == null) continue;
+
+        store.setRunningProcedureCount(activeExecutorCount.incrementAndGet());
+        executionStartTime.set(EnvironmentEdgeManager.currentTime());
+        try {
+          if (traceEnabled) {
+            LOG.trace("Trying to start the execution of " + procedure);
           }
+          executeProcedure(procedure);
+        } finally {
+          store.setRunningProcedureCount(activeExecutorCount.decrementAndGet());
+          lastUpdate = EnvironmentEdgeManager.currentTime();
+          executionStartTime.set(Long.MAX_VALUE);
         }
-      } catch (Throwable t) {
-        LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t);
-      } finally {
-        LOG.debug("Worker terminated.");
       }
+      LOG.debug("Worker thread terminated " + this);
       workerThreads.remove(this);
     }
 
-    @Override
-    public String toString() {
-      Procedure<?> p = this.activeProcedure;
-      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");
-    }
-
     /**
      * @return the time since the current procedure is running
      */
@@ -1691,15 +1617,14 @@ public class ProcedureExecutor<TEnvironment> {
     }
   }
 
-  /**
-   * Runs task on a period such as check for stuck workers.
-   * @see InlineChore
-   */
+  // ==========================================================================
+  //  Timeout Thread
+  // ==========================================================================
   private final class TimeoutExecutorThread extends StoppableThread {
     private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();
 
     public TimeoutExecutorThread(final ThreadGroup group) {
-      super(group, "ProcExecTimeout");
+      super(group, "ProcedureTimeoutExecutor");
     }
 
     @Override
@@ -1709,7 +1634,7 @@ public class ProcedureExecutor<TEnvironment> {
 
     @Override
     public void run() {
-      final boolean traceEnabled = LOG.isTraceEnabled();
+      final boolean isTraceEnabled = LOG.isTraceEnabled();
       while (isRunning()) {
         final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
         if (task == null || task == DelayedUtil.DELAYED_POISON) {
@@ -1718,8 +1643,8 @@ public class ProcedureExecutor<TEnvironment> {
           continue;
         }
 
-        if (traceEnabled) {
-          LOG.trace("Executing " + task);
+        if (isTraceEnabled) {
+          LOG.trace("Trying to start the execution of " + task);
         }
 
         // execute the task
@@ -1740,8 +1665,6 @@ public class ProcedureExecutor<TEnvironment> {
 
     public void add(final Procedure procedure) {
       assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
-      LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() +
-          ", timestamp=" + procedure.getTimeoutTimestamp());
       queue.add(new DelayedProcedure(procedure));
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
index b148dae..bdced10 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 /**
  * Special procedure used as a chore.
- * Instead of bringing the Chore class in (dependencies reason),
+ * instead of bringing the Chore class in (dependencies reason),
  * we reuse the executor timeout thread for this special case.
  *
  * The assumption is that procedure is used as hook to dispatch other procedures
@@ -43,7 +43,7 @@ public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEn
   protected abstract void periodicExecute(final TEnvironment env);
 
   @Override
-  protected Procedure<TEnvironment>[] execute(final TEnvironment env) {
+  protected Procedure[] execute(final TEnvironment env) {
     throw new UnsupportedOperationException();
   }
 
@@ -66,4 +66,4 @@ public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEn
   public void deserializeStateData(final InputStream stream) {
     throw new UnsupportedOperationException();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
index 233ef57..93d0d5d 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
@@ -93,7 +93,7 @@ public interface ProcedureScheduler {
 
   /**
    * Mark the event as not ready.
-   * Procedures calling waitEvent() will be suspended.
+   * procedures calling waitEvent() will be suspended.
    * @param event the event to mark as suspended/not ready
    */
   void suspendEvent(ProcedureEvent event);
@@ -125,7 +125,6 @@ public interface ProcedureScheduler {
    * List lock queues.
    * @return the locks
    */
-  // TODO: This seems to be the wrong place to hang this method.
   List<LockInfo> listLocks();
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
deleted file mode 100644
index 8d5ff3c..0000000
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2;
-
-import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
-import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp;
-import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
-import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
-
-import com.google.common.collect.ArrayListMultimap;
-
-/**
- * A procedure dispatcher that aggregates and sends after elapsed time or after we hit
- * count threshold. Creates its own threadpool to run RPCs with timeout.
- * <ul>
- * <li>Each server queue has a dispatch buffer</li>
- * <li>Once the dispatch buffer reaches a threshold-size/time we send<li>
- * </ul>
- * <p>Call {@link #start()} and then {@link #submitTask(Callable)}. When done,
- * call {@link #stop()}.
- */
-@InterfaceAudience.Private
-public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
-  private static final Log LOG = LogFactory.getLog(RemoteProcedureDispatcher.class);
-
-  public static final String THREAD_POOL_SIZE_CONF_KEY =
-      "hbase.procedure.remote.dispatcher.threadpool.size";
-  private static final int DEFAULT_THREAD_POOL_SIZE = 128;
-
-  public static final String DISPATCH_DELAY_CONF_KEY =
-      "hbase.procedure.remote.dispatcher.delay.msec";
-  private static final int DEFAULT_DISPATCH_DELAY = 150;
-
-  public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY =
-      "hbase.procedure.remote.dispatcher.max.queue.size";
-  private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
-
-  private final AtomicBoolean running = new AtomicBoolean(false);
-  private final ConcurrentHashMap<TRemote, BufferNode> nodeMap =
-      new ConcurrentHashMap<TRemote, BufferNode>();
-
-  private final int operationDelay;
-  private final int queueMaxSize;
-  private final int corePoolSize;
-
-  private TimeoutExecutorThread timeoutExecutor;
-  private ThreadPoolExecutor threadPool;
-
-  protected RemoteProcedureDispatcher(Configuration conf) {
-    this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE);
-    this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY);
-    this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE);
-  }
-
-  public boolean start() {
-    if (running.getAndSet(true)) {
-      LOG.warn("Already running");
-      return false;
-    }
-
-    LOG.info("Starting procedure remote dispatcher; threads=" + this.corePoolSize +
-      ", queueMaxSize=" + this.queueMaxSize + ", operationDelay=" + this.operationDelay);
-
-    // Create the timeout executor
-    timeoutExecutor = new TimeoutExecutorThread();
-    timeoutExecutor.start();
-
-    // Create the thread pool that will execute RPCs
-    threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
-      Threads.newDaemonThreadFactory(this.getClass().getSimpleName(),
-          getUncaughtExceptionHandler()));
-    return true;
-  }
-
-  public boolean stop() {
-    if (!running.getAndSet(false)) {
-      return false;
-    }
-
-    LOG.info("Stopping procedure remote dispatcher");
-
-    // send stop signals
-    timeoutExecutor.sendStopSignal();
-    threadPool.shutdownNow();
-    return true;
-  }
-
-  public void join() {
-    assert !running.get() : "expected not running";
-
-    // wait the timeout executor
-    timeoutExecutor.awaitTermination();
-    timeoutExecutor = null;
-
-    // wait for the thread pool to terminate
-    threadPool.shutdownNow();
-    try {
-      while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
-        LOG.warn("Waiting for thread-pool to terminate");
-      }
-    } catch (InterruptedException e) {
-      LOG.warn("Interrupted while waiting for thread-pool termination", e);
-    }
-  }
-
-  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
-    return new UncaughtExceptionHandler() {
-      @Override
-      public void uncaughtException(Thread t, Throwable e) {
-        LOG.warn("Failed to execute remote procedures " + t.getName(), e);
-      }
-    };
-  }
-
-  // ============================================================================================
-  //  Node Helpers
-  // ============================================================================================
-  /**
-   * Add a node that will be able to execute remote procedures
-   * @param key the node identifier
-   */
-  public void addNode(final TRemote key) {
-    assert key != null: "Tried to add a node with a null key";
-    final BufferNode newNode = new BufferNode(key);
-    nodeMap.putIfAbsent(key, newNode);
-  }
-
-  /**
-   * Add a remote rpc. Be sure to check result for successful add.
-   * @param key the node identifier
-   * @return True if we successfully added the operation.
-   */
-  public boolean addOperationToNode(final TRemote key, RemoteProcedure rp) {
-    assert key != null : "found null key for node";
-    BufferNode node = nodeMap.get(key);
-    if (node == null) {
-      return false;
-    }
-    node.add(rp);
-    // Check our node still in the map; could have been removed by #removeNode.
-    return nodeMap.contains(node);
-  }
-
-  /**
-   * Remove a remote node
-   * @param key the node identifier
-   */
-  public boolean removeNode(final TRemote key) {
-    final BufferNode node = nodeMap.remove(key);
-    if (node == null) return false;
-    node.abortOperationsInQueue();
-    return true;
-  }
-
-  // ============================================================================================
-  //  Task Helpers
-  // ============================================================================================
-  protected Future<Void> submitTask(Callable<Void> task) {
-    return threadPool.submit(task);
-  }
-
-  protected Future<Void> submitTask(Callable<Void> task, long delay, TimeUnit unit) {
-    final FutureTask<Void> futureTask = new FutureTask(task);
-    timeoutExecutor.add(new DelayedTask(futureTask, delay, unit));
-    return futureTask;
-  }
-
-  protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations);
-  protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations);
-
-  /**
-   * Data structure with reference to remote operation.
-   */
-  public static abstract class RemoteOperation {
-    private final RemoteProcedure remoteProcedure;
-
-    protected RemoteOperation(final RemoteProcedure remoteProcedure) {
-      this.remoteProcedure = remoteProcedure;
-    }
-
-    public RemoteProcedure getRemoteProcedure() {
-      return remoteProcedure;
-    }
-  }
-
-  /**
-   * Remote procedure reference.
-   * @param <TEnv>
-   * @param <TRemote>
-   */
-  public interface RemoteProcedure<TEnv, TRemote> {
-    RemoteOperation remoteCallBuild(TEnv env, TRemote remote);
-    void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response);
-    void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
-  }
-
-  /**
-   * Account of what procedures are running on remote node.
-   * @param <TEnv>
-   * @param <TRemote>
-   */
-  public interface RemoteNode<TEnv, TRemote> {
-    TRemote getKey();
-    void add(RemoteProcedure<TEnv, TRemote> operation);
-    void dispatch();
-  }
-
-  protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
-      final TRemote remote, final Set<RemoteProcedure> operations) {
-    final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
-    for (RemoteProcedure proc: operations) {
-      RemoteOperation operation = proc.remoteCallBuild(env, remote);
-      requestByType.put(operation.getClass(), operation);
-    }
-    return requestByType;
-  }
-
-  protected <T extends RemoteOperation> List<T> fetchType(
-      final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) {
-    return (List<T>)requestByType.removeAll(type);
-  }
-
-  // ============================================================================================
-  //  Timeout Helpers
-  // ============================================================================================
-  private final class TimeoutExecutorThread extends Thread {
-    private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
-
-    public TimeoutExecutorThread() {
-      super("ProcedureDispatcherTimeoutThread");
-    }
-
-    @Override
-    public void run() {
-      while (running.get()) {
-        final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
-        if (task == null || task == DelayedUtil.DELAYED_POISON) {
-          // the executor may be shutting down, and the task is just the shutdown request
-          continue;
-        }
-        if (task instanceof DelayedTask) {
-          threadPool.execute(((DelayedTask)task).getObject());
-        } else {
-          ((BufferNode)task).dispatch();
-        }
-      }
-    }
-
-    public void add(final DelayedWithTimeout delayed) {
-      queue.add(delayed);
-    }
-
-    public void remove(final DelayedWithTimeout delayed) {
-      queue.remove(delayed);
-    }
-
-    public void sendStopSignal() {
-      queue.add(DelayedUtil.DELAYED_POISON);
-    }
-
-    public void awaitTermination() {
-      try {
-        final long startTime = EnvironmentEdgeManager.currentTime();
-        for (int i = 0; isAlive(); ++i) {
-          sendStopSignal();
-          join(250);
-          if (i > 0 && (i % 8) == 0) {
-            LOG.warn("Waiting termination of thread " + getName() + ", " +
-              StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
-          }
-        }
-      } catch (InterruptedException e) {
-        LOG.warn(getName() + " join wait got interrupted", e);
-      }
-    }
-  }
-
-  // ============================================================================================
-  //  Internals Helpers
-  // ============================================================================================
-
-  /**
-   * Node that contains a set of RemoteProcedures
-   */
-  protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
-      implements RemoteNode<TEnv, TRemote> {
-    private Set<RemoteProcedure> operations;
-
-    protected BufferNode(final TRemote key) {
-      super(key, 0);
-    }
-
-    public TRemote getKey() {
-      return getObject();
-    }
-
-    public synchronized void add(final RemoteProcedure operation) {
-      if (this.operations == null) {
-        this.operations = new HashSet<>();
-        setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
-        timeoutExecutor.add(this);
-      }
-      this.operations.add(operation);
-      if (this.operations.size() > queueMaxSize) {
-        timeoutExecutor.remove(this);
-        dispatch();
-      }
-    }
-
-    public synchronized void dispatch() {
-      if (operations != null) {
-        remoteDispatch(getKey(), operations);
-        this.operations = null;
-      }
-    }
-
-    public synchronized void abortOperationsInQueue() {
-      if (operations != null) {
-        abortPendingOperations(getKey(), operations);
-        this.operations = null;
-      }
-    }
-
-    @Override
-    public String toString() {
-      return super.toString() + ", operations=" + this.operations;
-    }
-  }
-
-  /**
-   * Delayed object that holds a FutureTask.
-   * used to submit something later to the thread-pool.
-   */
-  private static final class DelayedTask extends DelayedContainerWithTimestamp<FutureTask<Void>> {
-    public DelayedTask(final FutureTask<Void> task, final long delay, final TimeUnit unit) {
-      super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
index 64bb278..1a84070 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
@@ -27,13 +27,12 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.SequentialProcedureData;
 
 /**
- * A SequentialProcedure describes one step in a procedure chain:
- * <pre>
+ * A SequentialProcedure describes one step in a procedure chain.
  *   -&gt; Step 1 -&gt; Step 2 -&gt; Step 3
- * </pre>
+ *
  * The main difference from a base Procedure is that the execute() of a
- * SequentialProcedure will be called only once; there will be no second
- * execute() call once the children are finished. which means once the child
+ * SequentialProcedure will be called only once, there will be no second
+ * execute() call once the child are finished. which means once the child
  * of a SequentialProcedure are completed the SequentialProcedure is completed too.
  */
 @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
index becd9b7..0590a93 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
@@ -21,10 +21,9 @@ package org.apache.hadoop.hbase.procedure2;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,7 +34,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMa
 /**
  * Procedure described by a series of steps.
  *
- * <p>The procedure implementor must have an enum of 'states', describing
+ * The procedure implementor must have an enum of 'states', describing
  * the various step of the procedure.
  * Once the procedure is running, the procedure-framework will call executeFromState()
  * using the 'state' provided by the user. The first call to executeFromState()
@@ -57,7 +56,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
   private int stateCount = 0;
   private int[] states = null;
 
-  private List<Procedure<TEnvironment>> subProcList = null;
+  private ArrayList<Procedure> subProcList = null;
 
   protected enum Flow {
     HAS_MORE_STATE,
@@ -71,7 +70,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
    *         Flow.HAS_MORE_STATE if there is another step.
    */
   protected abstract Flow executeFromState(TEnvironment env, TState state)
-  throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
+    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
 
   /**
    * called to perform the rollback of the specified state
@@ -126,15 +125,12 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
    * Add a child procedure to execute
    * @param subProcedure the child procedure
    */
-  protected void addChildProcedure(Procedure<TEnvironment>... subProcedure) {
-    if (subProcedure == null) return;
-    final int len = subProcedure.length;
-    if (len == 0) return;
+  protected void addChildProcedure(Procedure... subProcedure) {
     if (subProcList == null) {
-      subProcList = new ArrayList<>(len);
+      subProcList = new ArrayList<>(subProcedure.length);
     }
-    for (int i = 0; i < len; ++i) {
-      Procedure<TEnvironment> proc = subProcedure[i];
+    for (int i = 0; i < subProcedure.length; ++i) {
+      Procedure proc = subProcedure[i];
       if (!proc.hasOwner()) proc.setOwner(getOwner());
       subProcList.add(proc);
     }
@@ -142,23 +138,27 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
 
   @Override
   protected Procedure[] execute(final TEnvironment env)
-  throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
     updateTimestamp();
     try {
       failIfAborted();
 
       if (!hasMoreState() || isFailed()) return null;
+
       TState state = getCurrentState();
       if (stateCount == 0) {
         setNextState(getStateId(state));
       }
+
       stateFlow = executeFromState(env, state);
       if (!hasMoreState()) setNextState(EOF_STATE);
-      if (subProcList != null && !subProcList.isEmpty()) {
+
+      if (subProcList != null && subProcList.size() != 0) {
         Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
         subProcList = null;
         return subProcedures;
       }
+
       return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this};
     } finally {
       updateTimestamp();

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
index 9e53f42..c03e326 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
@@ -52,8 +52,8 @@ public class NoopProcedureStore extends ProcedureStoreBase {
   }
 
   @Override
-  public int setRunningProcedureCount(final int count) {
-    return count;
+  public void setRunningProcedureCount(final int count) {
+    // no-op
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index a690c81..385cedb 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -153,9 +153,8 @@ public interface ProcedureStore {
   /**
    * Set the number of procedure running.
    * This can be used, for example, by the store to know how long to wait before a sync.
-   * @return how many procedures are running (may not be same as <code>count</code>).
    */
-  int setRunningProcedureCount(int count);
+  void setRunningProcedureCount(int count);
 
   /**
    * Acquire the lease for the procedure store.

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index 95a1ef6..012ddeb 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -155,23 +155,9 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
     this.logSize += size;
   }
 
-  public void removeFile(final Path walArchiveDir) throws IOException {
+  public void removeFile() throws IOException {
     close();
-    boolean archived = false;
-    if (walArchiveDir != null) {
-      Path archivedFile = new Path(walArchiveDir, logFile.getName());
-      LOG.info("ARCHIVED (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + logFile + " to " + archivedFile);
-      if (!fs.rename(logFile, archivedFile)) {
-        LOG.warn("Failed archive of " + logFile + ", deleting");
-      } else {
-        archived = true;
-      }
-    }
-    if (!archived) {
-      if (!fs.delete(logFile, false)) {
-        LOG.warn("Failed delete of " + logFile);
-      }
-    }
+    fs.delete(logFile, false);
   }
 
   public void setProcIds(long minId, long maxId) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 0a05e6e..c672045 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -83,11 +83,11 @@ public class ProcedureWALFormatReader {
   //
   //  Fast Start: INIT/INSERT record and StackIDs
   // ---------------------------------------------
-  // We have two special records, INIT and INSERT, that track the first time
-  // the procedure was added to the WAL. We can use this information to be able
-  // to start procedures before reaching the end of the WAL, or before reading all WALs.
-  // But in some cases, the WAL with that record can be already gone.
-  // As an alternative, we can use the stackIds on each procedure,
+  // We have two special record, INIT and INSERT that tracks the first time
+  // the procedure was added to the WAL. We can use that information to be able
+  // to start procedures before reaching the end of the WAL, or before reading all the WALs.
+  // but in some cases the WAL with that record can be already gone.
+  // In alternative we can use the stackIds on each procedure,
   // to identify when a procedure is ready to start.
   // If there are gaps in the sum of the stackIds we need to read more WALs.
   //
@@ -107,16 +107,16 @@ public class ProcedureWALFormatReader {
    * Global tracker that will be used by the WALProcedureStore after load.
    * If the last WAL was closed cleanly we already have a full tracker ready to be used.
    * If the last WAL was truncated (e.g. master killed) the tracker will be empty
-   * and the 'partial' flag will be set. In this case, on WAL replay we are going
+   * and the 'partial' flag will be set. In this case on WAL replay we are going
    * to rebuild the tracker.
    */
   private final ProcedureStoreTracker tracker;
-  // TODO: private final boolean hasFastStartSupport;
+  // private final boolean hasFastStartSupport;
 
   /**
    * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we
    * re-build the list of procedures updated in that WAL because we need it for log cleaning
-   * purposes. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted.
+   * purpose. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted.
    * (see {@link WALProcedureStore#removeInactiveLogs()}).
    * However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother
    * re-building it.
@@ -137,7 +137,7 @@ public class ProcedureWALFormatReader {
   public void read(final ProcedureWALFile log) throws IOException {
     localTracker = log.getTracker().isPartial() ? log.getTracker() : null;
     if (localTracker != null) {
-      LOG.info("Rebuilding tracker for " + log);
+      LOG.info("Rebuilding tracker for log - " + log);
     }
 
     FSDataInputStream stream = log.getStream();
@@ -146,7 +146,7 @@ public class ProcedureWALFormatReader {
       while (hasMore) {
         ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
         if (entry == null) {
-          LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log);
+          LOG.warn("nothing left to decode. exiting with missing EOF");
           break;
         }
         switch (entry.getType()) {
@@ -171,7 +171,7 @@ public class ProcedureWALFormatReader {
         }
       }
     } catch (InvalidProtocolBufferException e) {
-      LOG.error("While reading procedure from " + log, e);
+      LOG.error("got an exception while reading the procedure WAL: " + log, e);
       loader.markCorruptedWAL(log, e);
     }
 
@@ -211,7 +211,7 @@ public class ProcedureWALFormatReader {
     maxProcId = Math.max(maxProcId, proc.getProcId());
     if (isRequired(proc.getProcId())) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Read " + entry.getType() + " entry " + proc.getProcId());
+        LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
       }
       localProcedureMap.add(proc);
       if (tracker.isPartial()) {
@@ -296,7 +296,7 @@ public class ProcedureWALFormatReader {
   //      replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
   //
   //  We also have a lazy grouping by "root procedure", and a list of
-  //  unlinked procedures. If after reading all the WALs we have unlinked
+  //  unlinked procedure. If after reading all the WALs we have unlinked
   //  procedures it means that we had a missing WAL or a corruption.
   //      rootHead = A <-> D <-> G
   //                 B     E
@@ -639,17 +639,17 @@ public class ProcedureWALFormatReader {
      * "ready" means that we all the information that we need in-memory.
      *
      * Example-1:
-     * We have two WALs, we start reading from the newest (wal-2)
+     * We have two WALs, we start reading fronm the newest (wal-2)
      *    wal-2 | C B |
      *    wal-1 | A B C |
      *
      * If C and B don't depend on A (A is not the parent), we can start them
-     * before reading wal-1. If B is the only one with parent A we can start C.
-     * We have to read one more WAL before being able to start B.
+     * before reading wal-1. If B is the only one with parent A we can start C
+     * and read one more WAL before being able to start B.
      *
      * How do we know with the only information in B that we are not ready.
      *  - easy case, the parent is missing from the global map
-     *  - more complex case we look at the Stack IDs.
+     *  - more complex case we look at the Stack IDs
      *
      * The Stack-IDs are added to the procedure order as incremental index
      * tracking how many times that procedure was executed, which is equivalent
@@ -664,7 +664,7 @@ public class ProcedureWALFormatReader {
      * executed before.
      * To identify when a Procedure is ready we do the sum of the stackIds of
      * the procedure and the parent. if the stackIdSum is equals to the
-     * sum of {1..maxStackId} then everything we need is available.
+     * sum of {1..maxStackId} then everything we need is avaiable.
      *
      * Example-2
      *    wal-2 | A |              A stackIds = [0, 2]
@@ -676,7 +676,7 @@ public class ProcedureWALFormatReader {
       assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
 
       if (rootEntry.isFinished()) {
-        // If the root procedure is finished, sub-procedures should be gone
+        // if the root procedure is finished, sub-procedures should be gone
         if (rootEntry.childHead != null) {
           LOG.error("unexpected active children for root-procedure: " + rootEntry);
           for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 1791cae..4712c30 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -66,7 +66,6 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceStability.Evolving
 public class WALProcedureStore extends ProcedureStoreBase {
   private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
-  public static final String LOG_PREFIX = "pv2-";
 
   public interface LeaseRecovery {
     void recoverFileLease(FileSystem fs, Path path) throws IOException;
@@ -125,7 +124,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
   private final Configuration conf;
   private final FileSystem fs;
   private final Path walDir;
-  private final Path walArchiveDir;
 
   private final AtomicReference<Throwable> syncException = new AtomicReference<>();
   private final AtomicBoolean loading = new AtomicBoolean(true);
@@ -187,15 +185,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
   public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
       final LeaseRecovery leaseRecovery) {
-    this(conf, fs, walDir, null, leaseRecovery);
-  }
-
-  public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
-      final Path walArchiveDir, final LeaseRecovery leaseRecovery) {
     this.fs = fs;
     this.conf = conf;
     this.walDir = walDir;
-    this.walArchiveDir = walArchiveDir;
     this.leaseRecovery = leaseRecovery;
   }
 
@@ -247,16 +239,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
       }
     };
     syncThread.start();
-
-    // Create archive dir up front. Rename won't work w/o it up on HDFS.
-    if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
-      if (this.fs.mkdirs(this.walArchiveDir)) {
-        if (LOG.isDebugEnabled()) LOG.debug("Created Procedure Store WAL archive dir " +
-            this.walArchiveDir);
-      } else {
-        LOG.warn("Failed create of " + this.walArchiveDir);
-      }
-    }
   }
 
   @Override
@@ -310,9 +292,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
   }
 
   @Override
-  public int setRunningProcedureCount(final int count) {
+  public void setRunningProcedureCount(final int count) {
+    LOG.debug("Set running procedure count=" + count + ", slots=" + slots.length);
     this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
-    return this.runningProcCount;
   }
 
   public ProcedureStoreTracker getStoreTracker() {
@@ -361,7 +343,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
           }
-          logs.getLast().removeFile(this.walArchiveDir);
+          logs.getLast().removeFile();
           continue;
         }
 
@@ -973,7 +955,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     // but we should check if someone else has created new files
     if (getMaxLogId(getLogFiles()) > flushLogId) {
       LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
-      logs.getLast().removeFile(this.walArchiveDir);
+      logs.getLast().removeFile();
       return false;
     }
 
@@ -1065,7 +1047,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
     // once there is nothing olding the oldest WAL we can remove it.
     while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
-      removeLogFile(logs.getFirst(), walArchiveDir);
+      removeLogFile(logs.getFirst());
       buildHoldingCleanupTracker();
     }
 
@@ -1097,8 +1079,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
   private void removeAllLogs(long lastLogId) {
     if (logs.size() <= 1) return;
 
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Remove all state logs with ID less than " + lastLogId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Remove all state logs with ID less than " + lastLogId);
     }
 
     boolean removed = false;
@@ -1107,7 +1089,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       if (lastLogId < log.getLogId()) {
         break;
       }
-      removeLogFile(log, walArchiveDir);
+      removeLogFile(log);
       removed = true;
     }
 
@@ -1116,15 +1098,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
   }
 
-  private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) {
+  private boolean removeLogFile(final ProcedureWALFile log) {
     try {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Removing log=" + log);
       }
-      log.removeFile(walArchiveDir);
+      log.removeFile();
       logs.remove(log);
       if (LOG.isDebugEnabled()) {
-        LOG.info("Removed log=" + log + ", activeLogs=" + logs);
+        LOG.info("Removed log=" + log + " activeLogs=" + logs);
       }
       assert logs.size() > 0 : "expected at least one log";
     } catch (IOException e) {
@@ -1146,7 +1128,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
   }
 
   protected Path getLogFilePath(final long logId) throws IOException {
-    return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId));
+    return new Path(walDir, String.format("state-%020d.log", logId));
   }
 
   private static long getLogIdFromName(final String name) {
@@ -1159,7 +1141,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     @Override
     public boolean accept(Path path) {
       String name = path.getName();
-      return name.startsWith(LOG_PREFIX) && name.endsWith(".log");
+      return name.startsWith("state-") && name.endsWith(".log");
     }
   };
 
@@ -1210,7 +1192,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
         }
 
         maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
-        ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
+        ProcedureWALFile log = initOldLog(logFiles[i]);
         if (log != null) {
           this.logs.add(log);
         }
@@ -1240,22 +1222,21 @@ public class WALProcedureStore extends ProcedureStoreBase {
   /**
    * Loads given log file and it's tracker.
    */
-  private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir)
-  throws IOException {
+  private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
     final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
     if (logFile.getLen() == 0) {
       LOG.warn("Remove uninitialized log: " + logFile);
-      log.removeFile(walArchiveDir);
+      log.removeFile();
       return null;
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Opening Pv2 " + logFile);
+      LOG.debug("Opening state-log: " + logFile);
     }
     try {
       log.open();
     } catch (ProcedureWALFormat.InvalidWALDataException e) {
       LOG.warn("Remove uninitialized log: " + logFile, e);
-      log.removeFile(walArchiveDir);
+      log.removeFile();
       return null;
     } catch (IOException e) {
       String msg = "Unable to read state log: " + logFile;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
index faf8e7e..cde37bd 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-// FIX namings. TODO.
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public final class DelayedUtil {
@@ -149,9 +148,6 @@ public final class DelayedUtil {
     }
   }
 
-  /**
-   * Has a timeout.
-   */
   public static class DelayedContainerWithTimestamp<T> extends DelayedContainer<T> {
     private long timeout;
 
@@ -169,4 +165,4 @@ public final class DelayedUtil {
       this.timeout = timeout;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
index 78daf5a..408cffd 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
@@ -42,7 +42,7 @@ public class TestProcedureToString {
    */
   static class BasicProcedure extends Procedure<BasicProcedureEnv> {
     @Override
-    protected Procedure<BasicProcedureEnv>[] execute(BasicProcedureEnv env)
+    protected Procedure<?>[] execute(BasicProcedureEnv env)
         throws ProcedureYieldException, InterruptedException {
       return new Procedure [] {this};
     }
@@ -78,6 +78,8 @@ public class TestProcedureToString {
     }
   }
 
+  
+
   /**
    * Test that I can override the toString for its state value.
    * @throws ProcedureYieldException


Mime
View raw message