hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [4/4] hbase git commit: HBASE-16744 Procedure V2 - Lock procedures to allow clients to acquire locks on tables/namespaces/regions (Matteo Bertozzi)
Date Sat, 14 Jan 2017 05:07:27 GMT
HBASE-16744 Procedure V2 - Lock procedures to allow clients to acquire
locks on tables/namespaces/regions (Matteo Bertozzi)

Incorporates review comments from
    https://reviews.apache.org/r/52589/
    https://reviews.apache.org/r/54388/

M hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
 Fix for eclipse complaint (from Duo Zhang)

M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
 Log formatting

M hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
 Added wait procedures utility.

A hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/LockServiceProtos.java
A hbase-protocol-shaded/src/main/protobuf/LockService.proto b/hbase-protocol-shaded/src/main/protobuf/LockService.proto
 Implement new locking CP overrides.

A hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/EntityLock.java
 New hbase entity lock (ns, table, or regions)

A hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/LockServiceClient.java
 Client that can use the new internal locking service.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4cb09a49
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4cb09a49
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4cb09a49

Branch: refs/heads/master
Commit: 4cb09a494c4148de2b4e8c6cd011bacdf7f33b1a
Parents: 9fd5dab1
Author: Michael Stack <stack@apache.org>
Authored: Wed Jan 11 14:38:59 2017 -0800
Committer: Michael Stack <stack@apache.org>
Committed: Fri Jan 13 21:07:03 2017 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncTableBase.java     |    4 +-
 .../hadoop/hbase/procedure2/Procedure.java      |   10 +-
 .../hbase/procedure2/ProcedureExecutor.java     |   98 +-
 .../procedure2/store/wal/WALProcedureStore.java |    8 +-
 .../procedure2/ProcedureTestingUtility.java     |   12 +
 .../protobuf/generated/LockServiceProtos.java   | 5328 ++++++++++++++++++
 .../src/main/protobuf/LockService.proto         |   79 +
 .../hbase/rsgroup/RSGroupAdminEndpoint.java     |   54 +-
 .../hadoop/hbase/client/locking/EntityLock.java |  266 +
 .../hbase/client/locking/LockServiceClient.java |  111 +
 .../BaseMasterAndRegionObserver.java            |   23 +
 .../hbase/coprocessor/BaseMasterObserver.java   |   25 +-
 .../hbase/coprocessor/MasterObserver.java       |   36 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |    9 +-
 .../hbase/master/MasterCoprocessorHost.java     |   43 +
 .../hadoop/hbase/master/MasterRpcServices.java  |   68 +-
 .../hadoop/hbase/master/MasterServices.java     |    5 +
 .../hbase/master/locking/LockManager.java       |  271 +
 .../hbase/master/locking/LockProcedure.java     |  462 ++
 .../org/apache/hadoop/hbase/util/IdLock.java    |   12 +-
 .../hbase/client/locking/TestEntityLocks.java   |  182 +
 .../hbase/coprocessor/TestMasterObserver.java   |   64 +-
 .../hbase/master/MockNoopMasterServices.java    |    8 +-
 .../hadoop/hbase/master/MockRegionServer.java   |    1 +
 .../hbase/master/locking/TestLockManager.java   |  161 +
 .../hbase/master/locking/TestLockProcedure.java |  456 ++
 26 files changed, 7709 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index 19a22c0..d80627f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -386,8 +386,8 @@ public interface AsyncTableBase {
    * @return A list of {@link CompletableFuture}s that represent the existence for each get.
    */
   default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
-    return get(toCheckExistenceOnly(gets)).stream().map(f -> f.thenApply(r -> r.getExists()))
-        .collect(toList());
+    return get(toCheckExistenceOnly(gets)).stream().
+        <CompletableFuture<Boolean>>map(f -> f.thenApply(r -> r.getExists())).collect(toList());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index cb4ee47..3f3cf33 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -243,24 +243,24 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure>
{
     final StringBuilder sb = new StringBuilder();
     toStringClassDetails(sb);
 
-    sb.append(" id=");
+    sb.append(", procId=");
     sb.append(getProcId());
 
     if (hasParent()) {
-      sb.append(" parent=");
+      sb.append(", parent=");
       sb.append(getParentProcId());
     }
 
     if (hasOwner()) {
-      sb.append(" owner=");
+      sb.append(", owner=");
       sb.append(getOwner());
     }
 
-    sb.append(" state=");
+    sb.append(", state=");
     toStringState(sb);
 
     if (hasException()) {
-      sb.append(" failed=" + getException());
+      sb.append(", failed=" + getException());
     }
 
     return sb;

http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index c65f3fb..d3b65e8 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -91,7 +92,7 @@ public class ProcedureExecutor<TEnvironment> {
       final boolean kill = this.killBeforeStoreUpdate;
       if (this.toggleKillBeforeStoreUpdate) {
         this.killBeforeStoreUpdate = !kill;
-        LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
+        LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
       }
       return kill;
     }
@@ -172,7 +173,7 @@ public class ProcedureExecutor<TEnvironment> {
 
       final long now = EnvironmentEdgeManager.currentTime();
       final Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
-      final boolean isDebugEnabled = LOG.isDebugEnabled();
+      final boolean debugEnabled = LOG.isDebugEnabled();
       while (it.hasNext() && store.isRunning()) {
         final Map.Entry<Long, ProcedureInfo> entry = it.next();
         final ProcedureInfo procInfo = entry.getValue();
@@ -180,8 +181,8 @@ public class ProcedureExecutor<TEnvironment> {
         // TODO: Select TTL based on Procedure type
         if ((procInfo.hasClientAckTime() && (now - procInfo.getClientAckTime()) >=
evictAckTtl) ||
             (now - procInfo.getLastUpdate()) >= evictTtl) {
-          if (isDebugEnabled) {
-            LOG.debug("Evict completed procedure: " + procInfo);
+          if (debugEnabled) {
+            LOG.debug("Evict completed " + procInfo);
           }
           batchIds[batchCount++] = entry.getKey();
           if (batchCount == batchIds.length) {
@@ -281,7 +282,7 @@ public class ProcedureExecutor<TEnvironment> {
       @Override
       public void setMaxProcId(long maxProcId) {
         assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
-        LOG.debug("load procedures maxProcId=" + maxProcId);
+        LOG.debug("Load maxProcId=" + maxProcId);
         lastProcId.set(maxProcId);
       }
 
@@ -295,7 +296,7 @@ public class ProcedureExecutor<TEnvironment> {
         int corruptedCount = 0;
         while (procIter.hasNext()) {
           ProcedureInfo proc = procIter.nextAsProcedureInfo();
-          LOG.error("corrupted procedure: " + proc);
+          LOG.error("Corrupt " + proc);
           corruptedCount++;
         }
         if (abortOnCorruption && corruptedCount > 0) {
@@ -307,7 +308,7 @@ public class ProcedureExecutor<TEnvironment> {
 
   private void loadProcedures(final ProcedureIterator procIter,
       final boolean abortOnCorruption) throws IOException {
-    final boolean isDebugEnabled = LOG.isDebugEnabled();
+    final boolean debugEnabled = LOG.isDebugEnabled();
 
     // 1. Build the rollback stack
     int runnablesCount = 0;
@@ -320,8 +321,8 @@ public class ProcedureExecutor<TEnvironment> {
         nonceKey = proc.getNonceKey();
         procId = proc.getProcId();
         completed.put(proc.getProcId(), proc);
-        if (isDebugEnabled) {
-          LOG.debug("The procedure is completed: " + proc);
+        if (debugEnabled) {
+          LOG.debug("Completed " + proc);
         }
       } else {
         Procedure proc = procIter.nextAsProcedure();
@@ -361,8 +362,8 @@ public class ProcedureExecutor<TEnvironment> {
       Procedure proc = procIter.nextAsProcedure();
       assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc="
+ proc;
 
-      if (isDebugEnabled) {
-        LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
+      if (debugEnabled) {
+        LOG.debug(String.format("Loading state=%s isFailed=%s: %s",
                     proc.getState(), proc.hasException(), proc));
       }
 
@@ -425,7 +426,7 @@ public class ProcedureExecutor<TEnvironment> {
       if (procStack.isValid()) continue;
 
       for (Procedure proc: procStack.getSubproceduresStack()) {
-        LOG.error("corrupted procedure: " + proc);
+        LOG.error("Corrupted " + proc);
         procedures.remove(proc.getProcId());
         runnableList.remove(proc);
         if (waitingSet != null) waitingSet.remove(proc);
@@ -485,7 +486,7 @@ public class ProcedureExecutor<TEnvironment> {
     // We have numThreads executor + one timer thread used for timing out
     // procedures and triggering periodic procedures.
     this.corePoolSize = numThreads;
-    LOG.info("Starting procedure executor threads=" + corePoolSize);
+    LOG.info("Starting executor threads=" + corePoolSize);
 
     // Create the Thread Group for the executors
     threadGroup = new ThreadGroup("ProcedureExecutor");
@@ -506,7 +507,7 @@ public class ProcedureExecutor<TEnvironment> {
     st = EnvironmentEdgeManager.currentTime();
     store.recoverLease();
     et = EnvironmentEdgeManager.currentTime();
-    LOG.info(String.format("recover procedure store (%s) lease: %s",
+    LOG.info(String.format("Recover store (%s) lease: %s",
       store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
 
     // start the procedure scheduler
@@ -520,11 +521,11 @@ public class ProcedureExecutor<TEnvironment> {
     st = EnvironmentEdgeManager.currentTime();
     load(abortOnCorruption);
     et = EnvironmentEdgeManager.currentTime();
-    LOG.info(String.format("load procedure store (%s): %s",
+    LOG.info(String.format("Load store (%s): %s",
       store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
 
     // Start the executors. Here we must have the lastProcId set.
-    LOG.debug("start workers " + workerThreads.size());
+    LOG.debug("Start workers " + workerThreads.size());
     timeoutExecutor.start();
     for (WorkerThread worker: workerThreads) {
       worker.start();
@@ -542,7 +543,7 @@ public class ProcedureExecutor<TEnvironment> {
       return;
     }
 
-    LOG.info("Stopping the procedure executor");
+    LOG.info("Stopping");
     scheduler.stop();
     timeoutExecutor.sendStopSignal();
   }
@@ -564,7 +565,7 @@ public class ProcedureExecutor<TEnvironment> {
     try {
       threadGroup.destroy();
     } catch (IllegalThreadStateException e) {
-      LOG.error("thread group " + threadGroup + " contains running threads");
+      LOG.error("Thread group " + threadGroup + " contains running threads");
       threadGroup.list();
     } finally {
       threadGroup = null;
@@ -693,12 +694,12 @@ public class ProcedureExecutor<TEnvironment> {
 
     // we found a registered nonce, but the procedure may not have been submitted yet.
     // since the client expect the procedure to be submitted, spin here until it is.
-    final boolean isTraceEnabled = LOG.isTraceEnabled();
+    final boolean traceEnabled = LOG.isTraceEnabled();
     while (isRunning() &&
            !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
            nonceKeysToProcIdsMap.containsKey(nonceKey)) {
-      if (isTraceEnabled) {
-        LOG.trace("waiting for procId=" + oldProcId.longValue() + " to be submitted");
+      if (traceEnabled) {
+        LOG.trace("Waiting for procId=" + oldProcId.longValue() + " to be submitted");
       }
       Threads.sleep(100);
     }
@@ -787,7 +788,7 @@ public class ProcedureExecutor<TEnvironment> {
     // Commit the transaction
     store.insert(proc, null);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Procedure " + proc + " added to the store.");
+      LOG.debug("Stored " + proc);
     }
 
     // Add the procedure to the executor
@@ -811,7 +812,7 @@ public class ProcedureExecutor<TEnvironment> {
     // Commit the transaction
     store.insert(procs);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Procedures added to the store: " + Arrays.toString(procs));
+      LOG.debug("Stored " + Arrays.toString(procs));
     }
 
     // Add the procedure to the executor
@@ -880,6 +881,14 @@ public class ProcedureExecutor<TEnvironment> {
     return procedures.get(procId);
   }
 
+  public <T extends Procedure> T getProcedure(final Class<T> clazz, final long
procId) {
+    final Procedure proc = getProcedure(procId);
+    if (clazz.isInstance(proc)) {
+      return (T)proc;
+    }
+    return null;
+  }
+
   public ProcedureInfo getResult(final long procId) {
     return completed.get(procId);
   }
@@ -917,7 +926,7 @@ public class ProcedureExecutor<TEnvironment> {
     if (result == null) {
       assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Procedure procId=" + procId + " already removed by the cleaner.");
+        LOG.debug("procId=" + procId + " already removed by the cleaner.");
       }
       return;
     }
@@ -999,7 +1008,7 @@ public class ProcedureExecutor<TEnvironment> {
         try {
           listener.procedureLoaded(procId);
         } catch (Throwable e) {
-          LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
+          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
         }
       }
     }
@@ -1011,7 +1020,7 @@ public class ProcedureExecutor<TEnvironment> {
         try {
           listener.procedureAdded(procId);
         } catch (Throwable e) {
-          LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
+          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
         }
       }
     }
@@ -1023,7 +1032,7 @@ public class ProcedureExecutor<TEnvironment> {
         try {
           listener.procedureFinished(procId);
         } catch (Throwable e) {
-          LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
+          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
         }
       }
     }
@@ -1053,6 +1062,11 @@ public class ProcedureExecutor<TEnvironment> {
     return lastProcId.get();
   }
 
+  @VisibleForTesting
+  public Set<Long> getActiveProcIds() {
+    return procedures.keySet();
+  }
+
   private Long getRootProcedureId(Procedure proc) {
     return Procedure.getRootProcedureId(procedures, proc);
   }
@@ -1111,7 +1125,7 @@ public class ProcedureExecutor<TEnvironment> {
 
       if (proc.isSuccess()) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Procedure completed in " +
+          LOG.debug("Completed in " +
               StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
         }
         // Finalize the procedure state
@@ -1203,7 +1217,7 @@ public class ProcedureExecutor<TEnvironment> {
     }
 
     // Finalize the procedure state
-    LOG.info("Rolledback procedure " + rootProc +
+    LOG.info("Rolled back " + rootProc +
              " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
              " exception=" + exception.getMessage());
     procedureFinished(rootProc);
@@ -1220,7 +1234,7 @@ public class ProcedureExecutor<TEnvironment> {
       proc.doRollback(getEnvironment());
     } catch (IOException e) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("rollback attempt failed for " + proc, e);
+        LOG.debug("Roll back attempt failed for " + proc, e);
       }
       return false;
     } catch (InterruptedException e) {
@@ -1294,7 +1308,7 @@ public class ProcedureExecutor<TEnvironment> {
         isSuspended = true;
       } catch (ProcedureYieldException e) {
         if (LOG.isTraceEnabled()) {
-          LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage());
+          LOG.trace("Yield " + procedure + ": " + e.getMessage());
         }
         scheduler.yield(procedure);
         return;
@@ -1418,8 +1432,8 @@ public class ProcedureExecutor<TEnvironment> {
     }
 
     // If this procedure is the last child awake the parent procedure
-    final boolean isTraceEnabled = LOG.isTraceEnabled();
-    if (isTraceEnabled) {
+    final boolean traceEnabled = LOG.isTraceEnabled();
+    if (traceEnabled) {
       LOG.trace(parent + " child is done: " + procedure);
     }
 
@@ -1427,7 +1441,7 @@ public class ProcedureExecutor<TEnvironment> {
       parent.setState(ProcedureState.RUNNABLE);
       store.update(parent);
       scheduler.addFront(parent);
-      if (isTraceEnabled) {
+      if (traceEnabled) {
         LOG.trace(parent + " all the children finished their work, resume.");
       }
       return;
@@ -1438,7 +1452,7 @@ public class ProcedureExecutor<TEnvironment> {
       final Procedure procedure, final Procedure[] subprocs) {
     if (subprocs != null && !procedure.isFailed()) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
+        LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
       }
       store.insert(procedure, subprocs);
     } else {
@@ -1464,7 +1478,7 @@ public class ProcedureExecutor<TEnvironment> {
 
   private void handleInterruptedException(final Procedure proc, final InterruptedException
e) {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e);
+      LOG.trace("Interrupt during " + proc + ". suspend and retry it later.", e);
     }
 
     // NOTE: We don't call Thread.currentThread().interrupt()
@@ -1530,7 +1544,7 @@ public class ProcedureExecutor<TEnvironment> {
 
     @Override
     public void run() {
-      final boolean isTraceEnabled = LOG.isTraceEnabled();
+      final boolean traceEnabled = LOG.isTraceEnabled();
       long lastUpdate = EnvironmentEdgeManager.currentTime();
       while (isRunning() && keepAlive(lastUpdate)) {
         final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
@@ -1539,7 +1553,7 @@ public class ProcedureExecutor<TEnvironment> {
         store.setRunningProcedureCount(activeExecutorCount.incrementAndGet());
         executionStartTime.set(EnvironmentEdgeManager.currentTime());
         try {
-          if (isTraceEnabled) {
+          if (traceEnabled) {
             LOG.trace("Trying to start the execution of " + procedure);
           }
           executeProcedure(procedure);
@@ -1549,7 +1563,7 @@ public class ProcedureExecutor<TEnvironment> {
           executionStartTime.set(Long.MAX_VALUE);
         }
       }
-      LOG.debug("worker thread terminated " + this);
+      LOG.debug("Worker thread terminated " + this);
       workerThreads.remove(this);
     }
 
@@ -1691,7 +1705,7 @@ public class ProcedureExecutor<TEnvironment> {
           sendStopSignal();
           join(250);
           if (i > 0 && (i % 8) == 0) {
-            LOG.warn("waiting termination of thread " + getName() + ", " +
+            LOG.warn("Waiting termination of thread " + getName() + ", " +
               StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
           }
         }
@@ -1767,7 +1781,7 @@ public class ProcedureExecutor<TEnvironment> {
 
         // WARN the worker is stuck
         stuckCount++;
-        LOG.warn("found worker stuck " + worker +
+        LOG.warn("Worker stuck " + worker +
             " run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
       }
       return stuckCount;
@@ -1785,7 +1799,7 @@ public class ProcedureExecutor<TEnvironment> {
         final WorkerThread worker = new WorkerThread(threadGroup);
         workerThreads.add(worker);
         worker.start();
-        LOG.debug("added a new worker thread " + worker);
+        LOG.debug("Added new worker thread " + worker);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 4465993..d4d5773 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -294,7 +294,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
   @Override
   public void setRunningProcedureCount(final int count) {
-    LOG.debug("set running procedure count=" + count + " slots=" + slots.length);
+    LOG.debug("Set running procedure count=" + count + ", slots=" + slots.length);
     this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
   }
 
@@ -326,7 +326,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
         try {
           flushLogId = initOldLogs(oldLogs);
         } catch (FileNotFoundException e) {
-          LOG.warn("someone else is active and deleted logs. retrying.", e);
+          LOG.warn("Someone else is active and deleted logs. retrying.", e);
           oldLogs = getLogFiles();
           continue;
         }
@@ -334,7 +334,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
         // Create new state-log
         if (!rollWriter(flushLogId + 1)) {
           // someone else has already created this log
-          LOG.debug("someone else has already created log " + flushLogId);
+          LOG.debug("Someone else has already created log " + flushLogId);
           continue;
         }
 
@@ -428,7 +428,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     try {
       periodicRoll();
     } catch (IOException e) {
-      LOG.warn("unable to cleanup logs on load: " + e.getMessage(), e);
+      LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 9edc711..8aa2088 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -206,6 +206,18 @@ public class ProcedureTestingUtility {
     }
   }
 
+  public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor,
long... procIds) {
+    for (int i = 0; i < procIds.length; ++i) {
+      waitProcedure(procExecutor, procIds[i]);
+    }
+  }
+
+  public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor)
{
+    for (long procId : procExecutor.getActiveProcIds()) {
+      waitProcedure(procExecutor, procId);
+    }
+  }
+
   public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor)
{
     int stableRuns = 0;
     while (stableRuns < 10) {


Mime
View raw message