hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [04/24] hbase git commit: HBASE-13202 Procedure v2 - core framework
Date Fri, 10 Apr 2015 18:10:38 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
new file mode 100644
index 0000000..13f7bfa
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -0,0 +1,721 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
+
+/**
+ * WAL implementation of the ProcedureStore.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class WALProcedureStore implements ProcedureStore {
+  private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
+
+  public interface LeaseRecovery {
+    void recoverFileLease(FileSystem fs, Path path) throws IOException;
+  }
+
+  private static final int MAX_RETRIES_BEFORE_ABORT = 3;
+
+  private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
+  private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
+
+  private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
+    new CopyOnWriteArrayList<ProcedureStoreListener>();
+
+  private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
+  private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition waitCond = lock.newCondition();
+  private final Condition slotCond = lock.newCondition();
+  private final Condition syncCond = lock.newCondition();
+
+  private final LeaseRecovery leaseRecovery;
+  private final Configuration conf;
+  private final FileSystem fs;
+  private final Path logDir;
+
+  private AtomicBoolean inSync = new AtomicBoolean(false);
+  private ArrayBlockingQueue<ByteSlot> slotsCache = null;
+  private Set<ProcedureWALFile> corruptedLogs = null;
+  private FSDataOutputStream stream = null;
+  private long totalSynced = 0;
+  private long flushLogId = 0;
+  private int slotIndex = 0;
+  private Thread syncThread;
+  private ByteSlot[] slots;
+  private int syncWaitMsec;
+
+  public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
+      final LeaseRecovery leaseRecovery) {
+    this.fs = fs;
+    this.conf = conf;
+    this.logDir = logDir;
+    this.leaseRecovery = leaseRecovery;
+  }
+
+  @Override
+  public void start(int numSlots) throws IOException {
+    if (running.getAndSet(true)) {
+      return;
+    }
+
+    // Init buffer slots
+    slots = new ByteSlot[numSlots];
+    slotsCache = new ArrayBlockingQueue(numSlots, true);
+    while (slotsCache.remainingCapacity() > 0) {
+      slotsCache.offer(new ByteSlot());
+    }
+
+    // Tunings
+    syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
+
+    // Init sync thread
+    syncThread = new Thread("WALProcedureStoreSyncThread") {
+      @Override
+      public void run() {
+        while (running.get()) {
+          try {
+            syncLoop();
+          } catch (IOException e) {
+            LOG.error("got an exception from the sync-loop", e);
+            sendAbortProcessSignal();
+          }
+        }
+      }
+    };
+    syncThread.start();
+  }
+
+  @Override
+  public void stop(boolean abort) {
+    if (!running.getAndSet(false)) {
+      return;
+    }
+
+    LOG.info("Stopping the WAL Procedure Store");
+    if (lock.tryLock()) {
+      try {
+        waitCond.signalAll();
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    if (!abort) {
+      try {
+        syncThread.join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    // Close the writer
+    closeStream();
+
+    // Close the old logs
+    // they should be already closed, this is just in case the load fails
+    // and we call start() and then stop()
+    for (ProcedureWALFile log: logs) {
+      log.close();
+    }
+    logs.clear();
+  }
+
+  @Override
+  public boolean isRunning() {
+    return running.get();
+  }
+
+  @Override
+  public int getNumThreads() {
+    return slots == null ? 0 : slots.length;
+  }
+
+  public ProcedureStoreTracker getStoreTracker() {
+    return storeTracker;
+  }
+
+  @Override
+  public void registerListener(ProcedureStoreListener listener) {
+    this.listeners.add(listener);
+  }
+
+  @Override
+  public boolean unregisterListener(ProcedureStoreListener listener) {
+    return this.listeners.remove(listener);
+  }
+
+  @Override
+  public void recoverLease() throws IOException {
+    LOG.info("Starting WAL Procedure Store lease recovery");
+    FileStatus[] oldLogs = getLogFiles();
+    while (running.get()) {
+      // Get Log-MaxID and recover lease on old logs
+      flushLogId = initOldLogs(oldLogs) + 1;
+
+      // Create new state-log
+      if (!rollWriter(flushLogId)) {
+        // someone else has already created this log
+        LOG.debug("someone else has already created log " + flushLogId);
+        continue;
+      }
+
+      // We have the lease on the log
+      oldLogs = getLogFiles();
+      if (getMaxLogId(oldLogs) > flushLogId) {
+        // Someone else created new logs
+        LOG.debug("someone else created new logs. expected maxLogId < " + flushLogId);
+        logs.getLast().removeFile();
+        continue;
+      }
+
+      LOG.info("lease acquired flushLogId=" + flushLogId);
+      break;
+    }
+  }
+
+  @Override
+  public Iterator<Procedure> load() throws IOException {
+    if (logs.isEmpty()) {
+      throw new RuntimeException("recoverLease() must be called before loading data");
+    }
+
+    // Nothing to do, If we have only the current log.
+    if (logs.size() == 1) {
+      LOG.debug("No state logs to replay");
+      return null;
+    }
+
+    // Load the old logs
+    final ArrayList<ProcedureWALFile> toRemove = new ArrayList<ProcedureWALFile>();
+    Iterator<ProcedureWALFile> it = logs.descendingIterator();
+    it.next(); // Skip the current log
+    try {
+      return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
+        @Override
+        public void removeLog(ProcedureWALFile log) {
+          toRemove.add(log);
+        }
+
+        @Override
+        public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
+          if (corruptedLogs == null) {
+            corruptedLogs = new HashSet<ProcedureWALFile>();
+          }
+          corruptedLogs.add(log);
+          // TODO: sideline corrupted log
+        }
+      });
+    } finally {
+      if (!toRemove.isEmpty()) {
+        for (ProcedureWALFile log: toRemove) {
+          removeLogFile(log);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void insert(final Procedure proc, final Procedure[] subprocs) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("insert " + proc + " subproc=" + Arrays.toString(subprocs));
+    }
+
+    ByteSlot slot = acquireSlot();
+    long logId = -1;
+    try {
+      // Serialize the insert
+      if (subprocs != null) {
+        ProcedureWALFormat.writeInsert(slot, proc, subprocs);
+      } else {
+        assert !proc.hasParent();
+        ProcedureWALFormat.writeInsert(slot, proc);
+      }
+
+      // Push the transaction data and wait until it is persisted
+      logId = pushData(slot);
+    } catch (IOException e) {
+      // We are not able to serialize the procedure.
+      // this is a code error, and we are not able to go on.
+      LOG.fatal("Unable to serialize one of the procedure: proc=" + proc +
+                " subprocs=" + Arrays.toString(subprocs), e);
+      throw new RuntimeException(e);
+    } finally {
+      releaseSlot(slot);
+    }
+
+    // Update the store tracker
+    synchronized (storeTracker) {
+      if (logId == flushLogId) {
+        storeTracker.insert(proc, subprocs);
+      }
+    }
+  }
+
+  @Override
+  public void update(final Procedure proc) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("update " + proc);
+    }
+
+    ByteSlot slot = acquireSlot();
+    long logId = -1;
+    try {
+      // Serialize the update
+      ProcedureWALFormat.writeUpdate(slot, proc);
+
+      // Push the transaction data and wait until it is persisted
+      logId = pushData(slot);
+    } catch (IOException e) {
+      // We are not able to serialize the procedure.
+      // this is a code error, and we are not able to go on.
+      LOG.fatal("Unable to serialize the procedure: " + proc, e);
+      throw new RuntimeException(e);
+    } finally {
+      releaseSlot(slot);
+    }
+
+    // Update the store tracker
+    boolean removeOldLogs = false;
+    synchronized (storeTracker) {
+      if (logId == flushLogId) {
+        storeTracker.update(proc);
+        removeOldLogs = storeTracker.isUpdated();
+      }
+    }
+
+    if (removeOldLogs) {
+      removeAllLogs(logId - 1);
+    }
+  }
+
+  @Override
+  public void delete(final long procId) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("delete " + procId);
+    }
+
+    ByteSlot slot = acquireSlot();
+    long logId = -1;
+    try {
+      // Serialize the delete
+      ProcedureWALFormat.writeDelete(slot, procId);
+
+      // Push the transaction data and wait until it is persisted
+      logId = pushData(slot);
+    } catch (IOException e) {
+      // We are not able to serialize the procedure.
+      // this is a code error, and we are not able to go on.
+      LOG.fatal("Unable to serialize the procedure: " + procId, e);
+      throw new RuntimeException(e);
+    } finally {
+      releaseSlot(slot);
+    }
+
+    boolean removeOldLogs = false;
+    synchronized (storeTracker) {
+      if (logId == flushLogId) {
+        storeTracker.delete(procId);
+        if (storeTracker.isEmpty()) {
+          removeOldLogs = rollWriterOrDie(logId + 1);
+        }
+      }
+    }
+
+    if (removeOldLogs) {
+      removeAllLogs(logId);
+    }
+  }
+
+  private ByteSlot acquireSlot() {
+    ByteSlot slot = slotsCache.poll();
+    return slot != null ? slot : new ByteSlot();
+  }
+
+  private void releaseSlot(final ByteSlot slot) {
+    slot.reset();
+    slotsCache.offer(slot);
+  }
+
+  private long pushData(final ByteSlot slot) {
+    assert !logs.isEmpty() : "recoverLease() must be called before inserting data";
+    long logId = -1;
+
+    lock.lock();
+    try {
+      // Wait for the sync to be completed
+      while (true) {
+        if (inSync.get()) {
+          syncCond.await();
+        } else if (slotIndex == slots.length) {
+          slotCond.signal();
+          syncCond.await();
+        } else {
+          break;
+        }
+      }
+
+      slots[slotIndex++] = slot;
+      logId = flushLogId;
+
+      // Notify that there is new data
+      if (slotIndex == 1) {
+        waitCond.signal();
+      }
+
+      // Notify that the slots are full
+      if (slotIndex == slots.length) {
+        slotCond.signal();
+      }
+      syncCond.await();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      sendAbortProcessSignal();
+    } finally {
+      lock.unlock();
+    }
+    return logId;
+  }
+
+  private void syncLoop() throws IOException {
+    inSync.set(false);
+    while (running.get()) {
+      lock.lock();
+      try {
+        // Wait until new data is available
+        if (slotIndex == 0) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for data. flushed=" + StringUtils.humanSize(totalSynced));
+          }
+          waitCond.await();
+          if (slotIndex == 0) {
+            // no data.. probably a stop()
+            continue;
+          }
+        }
+
+        // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
+        slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
+
+        inSync.set(true);
+        totalSynced += syncSlots();
+        slotIndex = 0;
+        inSync.set(false);
+        syncCond.signalAll();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        sendAbortProcessSignal();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  private long syncSlots() {
+    int retry = 0;
+    long totalSynced = 0;
+    do {
+      try {
+        totalSynced = syncSlots(stream, slots, 0, slotIndex);
+        break;
+      } catch (Throwable e) {
+        if (++retry == MAX_RETRIES_BEFORE_ABORT) {
+          LOG.error("sync slot failed, abort.", e);
+          sendAbortProcessSignal();
+        }
+      }
+    } while (running.get());
+    return totalSynced;
+  }
+
+  protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
+      throws IOException {
+    long totalSynced = 0;
+    for (int i = 0; i < count; ++i) {
+      ByteSlot data = slots[offset + i];
+      data.writeTo(stream);
+      totalSynced += data.size();
+    }
+    stream.hsync();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Sync slots=" + count + '/' + slots.length +
+                " flushed=" + StringUtils.humanSize(totalSynced));
+    }
+    return totalSynced;
+  }
+
+  private void sendAbortProcessSignal() {
+    if (!this.listeners.isEmpty()) {
+      for (ProcedureStoreListener listener : this.listeners) {
+        listener.abortProcess();
+      }
+    }
+  }
+
+  private boolean rollWriterOrDie(final long logId) {
+    try {
+      return rollWriter(logId);
+    } catch (IOException e) {
+      LOG.warn("Unable to roll the log", e);
+      sendAbortProcessSignal();
+      return false;
+    }
+  }
+
+  private boolean rollWriter(final long logId) throws IOException {
+    ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
+      .setVersion(ProcedureWALFormat.HEADER_VERSION)
+      .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
+      .setMinProcId(storeTracker.getMinProcId())
+      .setLogId(logId)
+      .build();
+
+    FSDataOutputStream newStream = null;
+    Path newLogFile = null;
+    long startPos = -1;
+    try {
+      newLogFile = getLogFilePath(logId);
+      newStream = fs.create(newLogFile, false);
+      ProcedureWALFormat.writeHeader(newStream, header);
+      startPos = newStream.getPos();
+    } catch (FileAlreadyExistsException e) {
+      LOG.error("Log file with id=" + logId + " already exists", e);
+      return false;
+    }
+    lock.lock();
+    try {
+      closeStream();
+      synchronized (storeTracker) {
+        storeTracker.resetUpdates();
+      }
+      stream = newStream;
+      flushLogId = logId;
+      totalSynced = 0;
+      logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
+    } finally {
+      lock.unlock();
+    }
+    LOG.info("Roll new state log: " + logId);
+    return true;
+  }
+
+  private void closeStream() {
+    try {
+      if (stream != null) {
+        try {
+          ProcedureWALFormat.writeTrailer(stream, storeTracker);
+        } catch (IOException e) {
+          LOG.warn("Unable to write the trailer: " + e.getMessage());
+        }
+        stream.close();
+      }
+    } catch (IOException e) {
+      LOG.error("Unable to close the stream", e);
+    } finally {
+      stream = null;
+    }
+  }
+
+  private void removeAllLogs(long lastLogId) {
+    LOG.info("Remove all state logs with ID less then " + lastLogId);
+    while (!logs.isEmpty()) {
+      ProcedureWALFile log = logs.getFirst();
+      if (lastLogId < log.getLogId()) {
+        break;
+      }
+
+      removeLogFile(log);
+    }
+  }
+
+  private boolean removeLogFile(final ProcedureWALFile log) {
+    try {
+      LOG.debug("remove log: " + log);
+      log.removeFile();
+      logs.remove(log);
+    } catch (IOException e) {
+      LOG.error("unable to remove log " + log, e);
+      return false;
+    }
+    return true;
+  }
+
+  public Set<ProcedureWALFile> getCorruptedLogs() {
+    return corruptedLogs;
+  }
+
+  // ==========================================================================
+  //  FileSystem Log Files helpers
+  // ==========================================================================
+  public Path getLogDir() {
+    return this.logDir;
+  }
+
+  public FileSystem getFileSystem() {
+    return this.fs;
+  }
+
+  protected Path getLogFilePath(final long logId) throws IOException {
+    return new Path(logDir, String.format("state-%020d.log", logId));
+  }
+
+  private static long getLogIdFromName(final String name) {
+    int end = name.lastIndexOf(".log");
+    int start = name.lastIndexOf('-') + 1;
+    while (start < end) {
+      if (name.charAt(start) != '0')
+        break;
+      start++;
+    }
+    return Long.parseLong(name.substring(start, end));
+  }
+
+  private FileStatus[] getLogFiles() throws IOException {
+    try {
+      return fs.listStatus(logDir, new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+          String name = path.getName();
+          return name.startsWith("state-") && name.endsWith(".log");
+        }
+      });
+    } catch (FileNotFoundException e) {
+      LOG.warn("log directory not found: " + e.getMessage());
+      return null;
+    }
+  }
+
+  private long getMaxLogId(final FileStatus[] logFiles) {
+    long maxLogId = 0;
+    if (logFiles != null && logFiles.length > 0) {
+      for (int i = 0; i < logFiles.length; ++i) {
+        maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName()));
+      }
+    }
+    return maxLogId;
+  }
+
+  /**
+   * @return Max-LogID of the specified log file set
+   */
+  private long initOldLogs(final FileStatus[] logFiles) throws IOException {
+    this.logs.clear();
+
+    long maxLogId = 0;
+    if (logFiles != null && logFiles.length > 0) {
+      for (int i = 0; i < logFiles.length; ++i) {
+        final Path logPath = logFiles[i].getPath();
+        leaseRecovery.recoverFileLease(fs, logPath);
+        maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
+
+        ProcedureWALFile log = initOldLog(logFiles[i]);
+        if (log != null) {
+          this.logs.add(log);
+        }
+      }
+      Collections.sort(this.logs);
+      initTrackerFromOldLogs();
+    }
+    return maxLogId;
+  }
+
+  private void initTrackerFromOldLogs() {
+    // TODO: Load the most recent tracker available
+    if (!logs.isEmpty()) {
+      ProcedureWALFile log = logs.getLast();
+      try {
+        log.readTracker(storeTracker);
+      } catch (IOException e) {
+        LOG.error("Unable to read tracker for " + log, e);
+        // try the next one...
+        storeTracker.clear();
+        storeTracker.setPartialFlag(true);
+      }
+    }
+  }
+
+  private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
+    ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
+    if (logFile.getLen() == 0) {
+      LOG.warn("Remove uninitialized log " + logFile);
+      log.removeFile();
+      return null;
+    }
+
+    LOG.debug("opening state-log: " + logFile);
+    try {
+      log.open();
+    } catch (ProcedureWALFormat.InvalidWALDataException e) {
+      LOG.warn("Remove uninitialized log " + logFile, e);
+      log.removeFile();
+      return null;
+    } catch (IOException e) {
+      String msg = "Unable to read state log: " + logFile;
+      LOG.error(msg, e);
+      throw new IOException(msg, e);
+    }
+
+    if (log.isCompacted()) {
+      try {
+        log.readTrailer();
+      } catch (IOException e) {
+        // unfinished compacted log throw it away
+        LOG.warn("Unfinished compacted log " + logFile, e);
+        log.removeFile();
+        return null;
+      }
+    }
+    return log;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
new file mode 100644
index 0000000..8904116
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Similar to the ByteArrayOutputStream, with the exception that we can prepend an header.
+ * e.g. you write some data and you want to prepend an header that contains the data len or cksum.
+ * <code>
+ * ByteSlot slot = new ByteSlot();
+ * // write data
+ * slot.write(...);
+ * slot.write(...);
+ * // write header with the size of the written data
+ * slot.markHead();
+ * slot.write(Bytes.toBytes(slot.size()));
+ * // flush to stream as [header, data]
+ * slot.writeTo(stream);
+ * </code>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ByteSlot extends OutputStream {
+  private static final int DOUBLE_GROW_LIMIT = 1 << 20;
+  private static final int GROW_ALIGN = 128;
+
+  private byte[] buf;
+  private int head;
+  private int size;
+
+  public void reset() {
+    head = 0;
+    size = 0;
+  }
+
+  public void markHead() {
+    head = size;
+  }
+
+  public int getHead() {
+    return head;
+  }
+
+  public int size() {
+    return size;
+  }
+
+  public byte[] getBuffer() {
+    return buf;
+  }
+
+  public void writeAt(int offset, int b) {
+    head = Math.min(head, offset);
+    buf[offset] = (byte)b;
+  }
+
+  public void write(int b) {
+    ensureCapacity(size + 1);
+    buf[size++] = (byte)b;
+  }
+
+  public void write(byte[] b, int off, int len) {
+    ensureCapacity(size + len);
+    System.arraycopy(b, off, buf, size, len);
+    size += len;
+  }
+
+  public void writeTo(final OutputStream stream) throws IOException {
+    if (head != 0) {
+      stream.write(buf, head, size - head);
+      stream.write(buf, 0, head);
+    } else {
+      stream.write(buf, 0, size);
+    }
+  }
+
+  private void ensureCapacity(int minCapacity) {
+    minCapacity = (minCapacity + (GROW_ALIGN - 1)) & -GROW_ALIGN;
+    if (buf == null) {
+      buf = new byte[minCapacity];
+    } else if (minCapacity > buf.length) {
+      int newCapacity = buf.length << 1;
+      if (minCapacity > newCapacity || newCapacity > DOUBLE_GROW_LIMIT) {
+        newCapacity = minCapacity;
+      }
+      buf = Arrays.copyOf(buf, newCapacity);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
new file mode 100644
index 0000000..97134c2
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.util;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class StringUtils {
+  private StringUtils() {}
+
+  public static String humanTimeDiff(long timeDiff) {
+    StringBuilder buf = new StringBuilder();
+    long hours = timeDiff / (60*60*1000);
+    long rem = (timeDiff % (60*60*1000));
+    long minutes =  rem / (60*1000);
+    rem = rem % (60*1000);
+    float seconds = rem / 1000.0f;
+
+    if (hours != 0){
+      buf.append(hours);
+      buf.append("hrs, ");
+    }
+    if (minutes != 0){
+      buf.append(minutes);
+      buf.append("mins, ");
+    }
+    if (hours > 0 || minutes > 0) {
+      buf.append(seconds);
+      buf.append("sec");
+    } else {
+      buf.append(String.format("%.4fsec", seconds));
+    }
+    return buf.toString();
+  }
+
+  public static String humanSize(double size) {
+    if (size >= (1L << 40)) return String.format("%.1fT", size / (1L << 40));
+    if (size >= (1L << 30)) return String.format("%.1fG", size / (1L << 30));
+    if (size >= (1L << 20)) return String.format("%.1fM", size / (1L << 20));
+    if (size >= (1L << 10)) return String.format("%.1fK", size / (1L << 10));
+    return String.format("%.0f", size);
+  }
+
+  public static boolean isEmpty(final String input) {
+    return input == null || input.length() == 0;
+  }
+
+  public static String buildString(final String... parts) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < parts.length; ++i) {
+      sb.append(parts[i]);
+    }
+    return sb.toString();
+  }
+
+  public static StringBuilder appendStrings(final StringBuilder sb, final String... parts) {
+    for (int i = 0; i < parts.length; ++i) {
+      sb.append(parts[i]);
+    }
+    return sb;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
new file mode 100644
index 0000000..f710ef4
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.util;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class TimeoutBlockingQueue<E> {
+  public static interface TimeoutRetriever<T> {
+    long getTimeout(T object);
+    TimeUnit getTimeUnit(T object);
+  }
+
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition waitCond = lock.newCondition();
+  private final TimeoutRetriever<? super E> timeoutRetriever;
+
+  private E[] objects;
+  private int head = 0;
+  private int tail = 0;
+
+  public TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever) {
+    this(32, timeoutRetriever);
+  }
+
+  @SuppressWarnings("unchecked")
+  public TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> timeoutRetriever) {
+    this.objects = (E[])new Object[capacity];
+    this.timeoutRetriever = timeoutRetriever;
+  }
+
+  public void dump() {
+    for (int i = 0; i < objects.length; ++i) {
+      if (i == head) {
+        System.out.print("[" + objects[i] + "] ");
+      } else if (i == tail) {
+        System.out.print("]" + objects[i] + "[ ");
+      } else {
+        System.out.print(objects[i] + " ");
+      }
+    }
+    System.out.println();
+  }
+
+  public void clear() {
+    lock.lock();
+    try {
+      if (head != tail) {
+        for (int i = head; i < tail; ++i) {
+          objects[i] = null;
+        }
+        head = 0;
+        tail = 0;
+        waitCond.signal();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void add(E e) {
+    if (e == null) throw new NullPointerException();
+
+    lock.lock();
+    try {
+      addElement(e);
+      waitCond.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
+  public E poll() {
+    lock.lock();
+    try {
+      if (isEmpty()) {
+        waitCond.await();
+        return null;
+      }
+
+      E elem = objects[head];
+      long nanos = getNanosTimeout(elem);
+      nanos = waitCond.awaitNanos(nanos);
+      return nanos > 0 ? null : removeFirst();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return null;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public int size() {
+    return tail - head;
+  }
+
+  public boolean isEmpty() {
+    return (tail - head) == 0;
+  }
+
+  public void signalAll() {
+    lock.lock();
+    try {
+      waitCond.signalAll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void addElement(E elem) {
+    int size = (tail - head);
+    if ((objects.length - size) == 0) {
+      int capacity = size + ((size < 64) ? (size + 2) : (size >> 1));
+      E[] newObjects = (E[])new Object[capacity];
+
+      if (compareTimeouts(objects[tail - 1], elem) <= 0) {
+        // Append
+        System.arraycopy(objects, head, newObjects, 0, tail);
+        tail -= head;
+        newObjects[tail++] = elem;
+      } else if (compareTimeouts(objects[head], elem) > 0) {
+        // Prepend
+        System.arraycopy(objects, head, newObjects, 1, tail);
+        newObjects[0] = elem;
+        tail -= (head - 1);
+      } else {
+        // Insert in the middle
+        int index = upperBound(head, tail - 1, elem);
+        int newIndex = (index - head);
+        System.arraycopy(objects, head, newObjects, 0, newIndex);
+        newObjects[newIndex] = elem;
+        System.arraycopy(objects, index, newObjects, newIndex + 1, tail - index);
+        tail -= (head - 1);
+      }
+      head = 0;
+      objects = newObjects;
+    } else {
+      if (tail == objects.length) {
+        // shift down |-----AAAAAAA|
+        tail -= head;
+        System.arraycopy(objects, head, objects, 0, tail);
+        head = 0;
+      }
+
+      if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) {
+        // Append
+        objects[tail++] = elem;
+      } else if (head > 0 && compareTimeouts(objects[head], elem) > 0) {
+        // Prepend
+        objects[--head] = elem;
+      } else {
+        // Insert in the middle
+        int index = upperBound(head, tail - 1, elem);
+        System.arraycopy(objects, index, objects, index + 1, tail - index);
+        objects[index] = elem;
+        tail++;
+      }
+    }
+  }
+
+  private E removeFirst() {
+    E elem = objects[head];
+    objects[head] = null;
+    head = (head + 1) % objects.length;
+    if (head == 0) tail = 0;
+    return elem;
+  }
+
+  private int upperBound(int start, int end, E key) {
+    while (start < end) {
+      int mid = (start + end) >>> 1;
+      E mitem = objects[mid];
+      int cmp = compareTimeouts(mitem, key);
+      if (cmp > 0) {
+        end = mid;
+      } else {
+        start = mid + 1;
+      }
+    }
+    return start;
+  }
+
+  private int compareTimeouts(final E a, final E b) {
+    long t1 = getNanosTimeout(a);
+    long t2 = getNanosTimeout(b);
+    return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0;
+  }
+
+  private long getNanosTimeout(final E obj) {
+    TimeUnit unit = timeoutRetriever.getTimeUnit(obj);
+    long timeout = timeoutRetriever.getTimeout(obj);
+    return unit.toNanos(timeout);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
new file mode 100644
index 0000000..6e7306c
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ProcedureTestingUtility {
+  private static final Log LOG = LogFactory.getLog(ProcedureTestingUtility.class);
+
+  private ProcedureTestingUtility() {
+  }
+
+  public static ProcedureStore createStore(final Configuration conf, final FileSystem fs,
+      final Path baseDir) throws IOException {
+    return createWalStore(conf, fs, baseDir);
+  }
+
+  public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs,
+      final Path logDir) throws IOException {
+    return new WALProcedureStore(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
+      @Override
+      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
+        // no-op
+      }
+    });
+  }
+
+  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor)
+      throws Exception {
+    restart(procExecutor, null);
+  }
+
+  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
+      Runnable beforeStartAction) throws Exception {
+    ProcedureStore procStore = procExecutor.getStore();
+    int storeThreads = procExecutor.getNumThreads();
+    int execThreads = procExecutor.getNumThreads();
+    // stop
+    procExecutor.stop();
+    procStore.stop(false);
+    procExecutor.join();
+    // nothing running...
+    if (beforeStartAction != null) {
+      beforeStartAction.run();
+    }
+    // re-start
+    procStore.start(storeThreads);
+    procExecutor.start(execThreads);
+  }
+
+  public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
+      boolean value) {
+    if (procExecutor.testing == null) {
+      procExecutor.testing = new ProcedureExecutor.Testing();
+    }
+    procExecutor.testing.killBeforeStoreUpdate = value;
+    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
+  }
+
+  public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
+      boolean value) {
+    if (procExecutor.testing == null) {
+      procExecutor.testing = new ProcedureExecutor.Testing();
+    }
+    procExecutor.testing.toggleKillBeforeStoreUpdate = value;
+  }
+
+  public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
+    if (procExecutor.testing == null) {
+      procExecutor.testing = new ProcedureExecutor.Testing();
+    }
+    procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
+    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
+  }
+
+  public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
+      boolean value) {
+    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value);
+    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
+  }
+
+  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
+    long procId = procExecutor.submitProcedure(proc);
+    waitProcedure(procExecutor, procId);
+    return procId;
+  }
+
+  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) {
+    while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
+      Threads.sleepWithoutInterrupt(250);
+    }
+  }
+
+  public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
+    int stableRuns = 0;
+    while (stableRuns < 10) {
+      if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getRunnableSet().size() > 0) {
+        stableRuns = 0;
+        Threads.sleepWithoutInterrupt(100);
+      } else {
+        stableRuns++;
+        Threads.sleepWithoutInterrupt(25);
+      }
+    }
+  }
+
+  public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
+      long procId) {
+    assertFalse("expected a running proc", procExecutor.isFinished(procId));
+    assertEquals(null, procExecutor.getResult(procId));
+  }
+
+  public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor,
+      long procId) {
+    ProcedureResult result = procExecutor.getResult(procId);
+    assertTrue("expected procedure result", result != null);
+    assertProcNotFailed(result);
+  }
+
+  public static void assertProcNotFailed(final ProcedureResult result) {
+    Exception exception = result.getException();
+    String msg = exception != null ? exception.toString() : "no exception found";
+    assertFalse(msg, result.isFailed());
+  }
+
+  public static void assertIsAbortException(final ProcedureResult result) {
+    LOG.info(result.getException());
+    assertEquals(true, result.isFailed());
+    Throwable cause = result.getException().getCause();
+    assertTrue("expected abort exception, got "+ cause,
+        cause instanceof ProcedureAbortedException);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
new file mode 100644
index 0000000..7fe109e
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureExecution {
+  private static final Log LOG = LogFactory.getLog(TestProcedureExecution.class);
+
+  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+  private static final Procedure NULL_PROC = null;
+
+  private ProcedureExecutor<Void> procExecutor;
+  private ProcedureStore procStore;
+
+  private HBaseCommonTestingUtility htu;
+  private FileSystem fs;
+  private Path testDir;
+  private Path logDir;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    testDir = htu.getDataTestDir();
+    fs = testDir.getFileSystem(htu.getConfiguration());
+    assertTrue(testDir.depth() > 1);
+
+    logDir = new Path(testDir, "proc-logs");
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+    procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
+    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
+    procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    procExecutor.stop();
+    procStore.stop(false);
+    fs.delete(logDir, true);
+  }
+
+  private static class TestProcedureException extends Exception {
+    public TestProcedureException(String msg) { super(msg); }
+  }
+
+  public static class TestSequentialProcedure extends SequentialProcedure<Void> {
+    private final Procedure[] subProcs;
+    private final List<String> state;
+    private final Exception failure;
+    private final String name;
+
+    public TestSequentialProcedure() {
+      throw new UnsupportedOperationException("recovery should not be triggered here");
+    }
+
+    public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) {
+      this.state = state;
+      this.subProcs = subProcs;
+      this.name = name;
+      this.failure = null;
+    }
+
+    public TestSequentialProcedure(String name, List<String> state, Exception failure) {
+      this.state = state;
+      this.subProcs = null;
+      this.name = name;
+      this.failure = failure;
+    }
+
+    @Override
+    protected Procedure[] execute(Void env) {
+      state.add(name + "-execute");
+      if (failure != null) {
+        setFailure(new RemoteProcedureException(name + "-failure", failure));
+        return null;
+      }
+      return subProcs;
+    }
+
+    @Override
+    protected void rollback(Void env) {
+      state.add(name + "-rollback");
+    }
+
+    @Override
+    protected boolean abort(Void env) {
+      state.add(name + "-abort");
+      return true;
+    }
+  }
+
+  @Test(timeout=30000)
+  public void testBadSubprocList() {
+    List<String> state = new ArrayList<String>();
+    Procedure subProc2 = new TestSequentialProcedure("subProc2", state);
+    Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC);
+    Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
+    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
+
+    // subProc1 has a "null" subprocedure which is catched as InvalidArgument
+    // failed state with 2 execute and 2 rollback
+    LOG.info(state);
+    ProcedureResult result = procExecutor.getResult(rootId);
+    LOG.info(result.getException());
+    assertTrue(state.toString(), result.isFailed());
+    assertTrue(result.getException().toString(),
+      result.getException().getCause() instanceof IllegalArgumentException);
+
+    assertEquals(state.toString(), 4, state.size());
+    assertEquals("rootProc-execute", state.get(0));
+    assertEquals("subProc1-execute", state.get(1));
+    assertEquals("subProc1-rollback", state.get(2));
+    assertEquals("rootProc-rollback", state.get(3));
+  }
+
+  @Test(timeout=30000)
+  public void testSingleSequentialProc() {
+    List<String> state = new ArrayList<String>();
+    Procedure subProc2 = new TestSequentialProcedure("subProc2", state);
+    Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
+    Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
+    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
+
+    // successful state, with 3 execute
+    LOG.info(state);
+    ProcedureResult result = procExecutor.getResult(rootId);
+    ProcedureTestingUtility.assertProcNotFailed(result);
+    assertEquals(state.toString(), 3, state.size());
+  }
+
+  @Test(timeout=30000)
+  public void testSingleSequentialProcRollback() {
+    List<String> state = new ArrayList<String>();
+    Procedure subProc2 = new TestSequentialProcedure("subProc2", state,
+                                                     new TestProcedureException("fail test"));
+    Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
+    Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
+    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
+
+    // the 3rd proc fail, rollback after 2 successful execution
+    LOG.info(state);
+    ProcedureResult result = procExecutor.getResult(rootId);
+    LOG.info(result.getException());
+    assertTrue(state.toString(), result.isFailed());
+    assertTrue(result.getException().toString(),
+      result.getException().getCause() instanceof TestProcedureException);
+
+    assertEquals(state.toString(), 6, state.size());
+    assertEquals("rootProc-execute", state.get(0));
+    assertEquals("subProc1-execute", state.get(1));
+    assertEquals("subProc2-execute", state.get(2));
+    assertEquals("subProc2-rollback", state.get(3));
+    assertEquals("subProc1-rollback", state.get(4));
+    assertEquals("rootProc-rollback", state.get(5));
+  }
+
+  public static class TestFaultyRollback extends SequentialProcedure<Void> {
+    private int retries = 0;
+
+    public TestFaultyRollback() { }
+
+    @Override
+    protected Procedure[] execute(Void env) {
+      setFailure("faulty-rollback-test", new TestProcedureException("test faulty rollback"));
+      return null;
+    }
+
+    @Override
+    protected void rollback(Void env) throws IOException {
+      if (++retries < 3) {
+        LOG.info("inject rollback failure " + retries);
+        throw new IOException("injected failure number " + retries);
+      }
+      LOG.info("execute non faulty rollback step retries=" + retries);
+    }
+
+    @Override
+    protected boolean abort(Void env) { return false; }
+  }
+
+  @Test(timeout=30000)
+  public void testRollbackRetriableFailure() {
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, new TestFaultyRollback());
+
+    ProcedureResult result = procExecutor.getResult(procId);
+    LOG.info(result.getException());
+    assertTrue("expected a failure", result.isFailed());
+    assertTrue(result.getException().toString(),
+      result.getException().getCause() instanceof TestProcedureException);
+  }
+
+  public static class TestWaitingProcedure extends SequentialProcedure<Void> {
+    private final List<String> state;
+    private final boolean hasChild;
+    private final String name;
+
+    public TestWaitingProcedure() {
+      throw new UnsupportedOperationException("recovery should not be triggered here");
+    }
+
+    public TestWaitingProcedure(String name, List<String> state, boolean hasChild) {
+      this.hasChild = hasChild;
+      this.state = state;
+      this.name = name;
+    }
+
+    @Override
+    protected Procedure[] execute(Void env) {
+      state.add(name + "-execute");
+      setState(ProcedureState.WAITING_TIMEOUT);
+      return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null;
+    }
+
+    @Override
+    protected void rollback(Void env) {
+      state.add(name + "-rollback");
+    }
+
+    @Override
+    protected boolean abort(Void env) {
+      state.add(name + "-abort");
+      return true;
+    }
+
+    public static class TestWaitChild extends SequentialProcedure<Void> {
+      private final List<String> state;
+      private final String name;
+
+      public TestWaitChild() {
+        throw new UnsupportedOperationException("recovery should not be triggered here");
+      }
+
+      public TestWaitChild(String name, List<String> state) {
+        this.name = name;
+        this.state = state;
+      }
+
+      @Override
+      protected Procedure[] execute(Void env) {
+        state.add(name + "-child-execute");
+        return null;
+      }
+
+      @Override
+      protected void rollback(Void env) {
+        state.add(name + "-child-rollback");
+      }
+
+      @Override
+      protected boolean abort(Void env) {
+        state.add(name + "-child-abort");
+        return true;
+      }
+    }
+  }
+
+  @Test(timeout=30000)
+  public void testAbortTimeout() {
+    final int PROC_TIMEOUT_MSEC = 2500;
+    List<String> state = new ArrayList<String>();
+    Procedure proc = new TestWaitingProcedure("wproc", state, false);
+    proc.setTimeout(PROC_TIMEOUT_MSEC);
+    long startTime = EnvironmentEdgeManager.currentTime();
+    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    long execTime = EnvironmentEdgeManager.currentTime() - startTime;
+    LOG.info(state);
+    assertTrue("we didn't wait enough execTime=" + execTime, execTime >= PROC_TIMEOUT_MSEC);
+    ProcedureResult result = procExecutor.getResult(rootId);
+    LOG.info(result.getException());
+    assertTrue(state.toString(), result.isFailed());
+    assertTrue(result.getException().toString(),
+               result.getException().getCause() instanceof TimeoutException);
+    assertEquals(state.toString(), 2, state.size());
+    assertEquals("wproc-execute", state.get(0));
+    assertEquals("wproc-rollback", state.get(1));
+  }
+
+  @Test(timeout=30000)
+  public void testAbortTimeoutWithChildren() {
+    List<String> state = new ArrayList<String>();
+    Procedure proc = new TestWaitingProcedure("wproc", state, true);
+    proc.setTimeout(2500);
+    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    LOG.info(state);
+    ProcedureResult result = procExecutor.getResult(rootId);
+    LOG.info(result.getException());
+    assertTrue(state.toString(), result.isFailed());
+    assertTrue(result.getException().toString(),
+               result.getException().getCause() instanceof TimeoutException);
+    assertEquals(state.toString(), 4, state.size());
+    assertEquals("wproc-execute", state.get(0));
+    assertEquals("wproc-child-execute", state.get(1));
+    assertEquals("wproc-child-rollback", state.get(2));
+    assertEquals("wproc-rollback", state.get(3));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java
new file mode 100644
index 0000000..e36a295
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureFairRunQueues {
+  private static class TestRunQueue implements ProcedureFairRunQueues.FairObject {
+    private final int priority;
+    private final String name;
+
+    private boolean available = true;
+
+    public TestRunQueue(String name, int priority) {
+      this.name = name;
+      this.priority = priority;
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+
+    private void setAvailable(boolean available) {
+      this.available = available;
+    }
+
+    @Override
+    public boolean isAvailable() {
+      return available;
+    }
+
+    @Override
+    public int getPriority() {
+      return priority;
+    }
+  }
+
+  @Test
+  public void testEmptyFairQueues() throws Exception {
+    ProcedureFairRunQueues<String, TestRunQueue> fairq
+      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
+    for (int i = 0; i < 3; ++i) {
+      assertEquals(null, fairq.poll());
+    }
+  }
+
+  @Test
+  public void testFairQueues() throws Exception {
+    ProcedureFairRunQueues<String, TestRunQueue> fairq
+      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
+    TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
+    TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
+    TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
+
+    for (int i = 0; i < 3; ++i) {
+      assertEquals(a, fairq.poll());
+      assertEquals(b, fairq.poll());
+      assertEquals(m, fairq.poll());
+      assertEquals(m, fairq.poll());
+    }
+  }
+
+  @Test
+  public void testFairQueuesNotAvailable() throws Exception {
+    ProcedureFairRunQueues<String, TestRunQueue> fairq
+      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
+    TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
+    TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
+    TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
+
+    // m is not available
+    m.setAvailable(false);
+    for (int i = 0; i < 3; ++i) {
+      assertEquals(a, fairq.poll());
+      assertEquals(b, fairq.poll());
+    }
+
+    // m is available
+    m.setAvailable(true);
+    for (int i = 0; i < 3; ++i) {
+      assertEquals(m, fairq.poll());
+      assertEquals(m, fairq.poll());
+      assertEquals(a, fairq.poll());
+      assertEquals(b, fairq.poll());
+    }
+
+    // b is not available
+    b.setAvailable(false);
+    for (int i = 0; i < 3; ++i) {
+      assertEquals(m, fairq.poll());
+      assertEquals(m, fairq.poll());
+      assertEquals(a, fairq.poll());
+    }
+
+    assertEquals(m, fairq.poll());
+    m.setAvailable(false);
+    // m should be fetched next, but is no longer available
+    assertEquals(a, fairq.poll());
+    assertEquals(a, fairq.poll());
+    b.setAvailable(true);
+    for (int i = 0; i < 3; ++i) {
+      assertEquals(b, fairq.poll());
+      assertEquals(a, fairq.poll());
+    }
+  }
+
+  @Test
+  public void testFairQueuesDelete() throws Exception {
+    ProcedureFairRunQueues<String, TestRunQueue> fairq
+      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
+    TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
+    TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
+    TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
+
+    // Fetch A and then remove it
+    assertEquals(a, fairq.poll());
+    assertEquals(a, fairq.remove("A"));
+
+    // Fetch B and then remove it
+    assertEquals(b, fairq.poll());
+    assertEquals(b, fairq.remove("B"));
+
+    // Fetch M and then remove it
+    assertEquals(m, fairq.poll());
+    assertEquals(m, fairq.remove("M"));
+
+    // nothing left
+    assertEquals(null, fairq.poll());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
new file mode 100644
index 0000000..0b7395b
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -0,0 +1,488 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureRecovery {
+  private static final Log LOG = LogFactory.getLog(TestProcedureRecovery.class);
+
+  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+  private static final Procedure NULL_PROC = null;
+
+  private static ProcedureExecutor<Void> procExecutor;
+  private static ProcedureStore procStore;
+  private static int procSleepInterval;
+
+  private HBaseCommonTestingUtility htu;
+  private FileSystem fs;
+  private Path testDir;
+  private Path logDir;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    testDir = htu.getDataTestDir();
+    fs = testDir.getFileSystem(htu.getConfiguration());
+    assertTrue(testDir.depth() > 1);
+
+    logDir = new Path(testDir, "proc-logs");
+    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
+    procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
+    procExecutor.testing = new ProcedureExecutor.Testing();
+    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
+    procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
+    procSleepInterval = 0;
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    procExecutor.stop();
+    procStore.stop(false);
+    fs.delete(logDir, true);
+  }
+
+  private void restart() throws Exception {
+    dumpLogDirState();
+    ProcedureTestingUtility.restart(procExecutor);
+    dumpLogDirState();
+  }
+
+  public static class TestSingleStepProcedure extends SequentialProcedure<Void> {
+    private int step = 0;
+
+    public TestSingleStepProcedure() { }
+
+    @Override
+    protected Procedure[] execute(Void env) {
+      LOG.debug("execute procedure " + this + " step=" + step);
+      step++;
+      setResult(Bytes.toBytes(step));
+      return null;
+    }
+
+    @Override
+    protected void rollback(Void env) { }
+
+    @Override
+    protected boolean abort(Void env) { return true; }
+  }
+
+  public static class BaseTestStepProcedure extends SequentialProcedure<Void> {
+    private AtomicBoolean abort = new AtomicBoolean(false);
+    private int step = 0;
+
+    @Override
+    protected Procedure[] execute(Void env) {
+      LOG.debug("execute procedure " + this + " step=" + step);
+      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
+      step++;
+      Threads.sleepWithoutInterrupt(procSleepInterval);
+      if (isAborted()) {
+        setFailure(new RemoteProcedureException(getClass().getName(),
+          new ProcedureAbortedException(
+            "got an abort at " + getClass().getName() + " step=" + step)));
+        return null;
+      }
+      return null;
+    }
+
+    @Override
+    protected void rollback(Void env) {
+      LOG.debug("rollback procedure " + this + " step=" + step);
+      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
+      step++;
+    }
+
+    @Override
+    protected boolean abort(Void env) {
+      abort.set(true);
+      return true;
+    }
+
+    private boolean isAborted() {
+      boolean aborted = abort.get();
+      BaseTestStepProcedure proc = this;
+      while (proc.hasParent() && !aborted) {
+        proc = (BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId());
+        aborted = proc.isAborted();
+      }
+      return aborted;
+    }
+  }
+
+  public static class TestMultiStepProcedure extends BaseTestStepProcedure {
+    public TestMultiStepProcedure() { }
+
+    @Override
+    public Procedure[] execute(Void env) {
+      super.execute(env);
+      return isFailed() ? null : new Procedure[] { new Step1Procedure() };
+    }
+
+    public static class Step1Procedure extends BaseTestStepProcedure {
+      public Step1Procedure() { }
+
+      @Override
+      protected Procedure[] execute(Void env) {
+        super.execute(env);
+        return isFailed() ? null : new Procedure[] { new Step2Procedure() };
+      }
+    }
+
+    public static class Step2Procedure extends BaseTestStepProcedure {
+      public Step2Procedure() { }
+    }
+  }
+
+  @Test
+  public void testNoopLoad() throws Exception {
+    restart();
+  }
+
+  @Test(timeout=30000)
+  public void testSingleStepProcRecovery() throws Exception {
+    Procedure proc = new TestSingleStepProcedure();
+    procExecutor.testing.killBeforeStoreUpdate = true;
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    assertFalse(procExecutor.isRunning());
+    procExecutor.testing.killBeforeStoreUpdate = false;
+
+    // Restart and verify that the procedures restart
+    long restartTs = EnvironmentEdgeManager.currentTime();
+    restart();
+    waitProcedure(procId);
+    ProcedureResult result = procExecutor.getResult(procId);
+    assertTrue(result.getLastUpdate() > restartTs);
+    ProcedureTestingUtility.assertProcNotFailed(result);
+    assertEquals(1, Bytes.toInt(result.getResult()));
+    long resultTs = result.getLastUpdate();
+
+    // Verify that after another restart the result is still there
+    restart();
+    result = procExecutor.getResult(procId);
+    ProcedureTestingUtility.assertProcNotFailed(result);
+    assertEquals(resultTs, result.getLastUpdate());
+    assertEquals(1, Bytes.toInt(result.getResult()));
+  }
+
+  @Test(timeout=30000)
+  public void testMultiStepProcRecovery() throws Exception {
+    // Step 0 - kill
+    Procedure proc = new TestMultiStepProcedure();
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 0 exec && Step 1 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 1 exec && step 2 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 2 exec
+    restart();
+    waitProcedure(procId);
+    assertTrue(procExecutor.isRunning());
+
+    // The procedure is completed
+    ProcedureResult result = procExecutor.getResult(procId);
+    ProcedureTestingUtility.assertProcNotFailed(result);
+  }
+
+  @Test(timeout=30000)
+  public void testMultiStepRollbackRecovery() throws Exception {
+    // Step 0 - kill
+    Procedure proc = new TestMultiStepProcedure();
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 0 exec && Step 1 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 1 exec && step 2 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 2 exec - rollback - kill
+    procSleepInterval = 2500;
+    restart();
+    assertTrue(procExecutor.abort(procId));
+    waitProcedure(procId);
+    assertFalse(procExecutor.isRunning());
+
+    // rollback - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // rollback - complete
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Restart the executor and get the result
+    restart();
+    waitProcedure(procId);
+
+    // The procedure is completed
+    ProcedureResult result = procExecutor.getResult(procId);
+    ProcedureTestingUtility.assertIsAbortException(result);
+  }
+
+  public static class TestStateMachineProcedure
+      extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
+    enum State { STATE_1, STATE_2, STATE_3, DONE }
+
+    public TestStateMachineProcedure() {}
+
+    private AtomicBoolean aborted = new AtomicBoolean(false);
+    private int iResult = 0;
+
+    @Override
+    protected StateMachineProcedure.Flow executeFromState(Void env, State state) {
+      switch (state) {
+        case STATE_1:
+          LOG.info("execute step 1 " + this);
+          setNextState(State.STATE_2);
+          iResult += 3;
+          break;
+        case STATE_2:
+          LOG.info("execute step 2 " + this);
+          setNextState(State.STATE_3);
+          iResult += 5;
+          break;
+        case STATE_3:
+          LOG.info("execute step 3 " + this);
+          Threads.sleepWithoutInterrupt(procSleepInterval);
+          if (aborted.get()) {
+            LOG.info("aborted step 3 " + this);
+            setAbortFailure("test", "aborted");
+            break;
+          }
+          setNextState(State.DONE);
+          iResult += 7;
+          setResult(Bytes.toBytes(iResult));
+          return Flow.NO_MORE_STATE;
+        default:
+          throw new UnsupportedOperationException();
+      }
+      return Flow.HAS_MORE_STATE;
+    }
+
+    @Override
+    protected void rollbackState(Void env, final State state) {
+      switch (state) {
+        case STATE_1:
+          LOG.info("rollback step 1 " + this);
+          break;
+        case STATE_2:
+          LOG.info("rollback step 2 " + this);
+          break;
+        case STATE_3:
+          LOG.info("rollback step 3 " + this);
+          break;
+        default:
+          throw new UnsupportedOperationException();
+      }
+    }
+
+    @Override
+    protected State getState(final int stateId) {
+      return State.values()[stateId];
+    }
+
+    @Override
+    protected int getStateId(final State state) {
+      return state.ordinal();
+    }
+
+    @Override
+    protected State getInitialState() {
+      return State.STATE_1;
+    }
+
+    @Override
+    protected boolean abort(Void env) {
+      aborted.set(true);
+      return true;
+    }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws IOException {
+      super.serializeStateData(stream);
+      stream.write(Bytes.toBytes(iResult));
+    }
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) throws IOException {
+      super.deserializeStateData(stream);
+      byte[] data = new byte[4];
+      stream.read(data);
+      iResult = Bytes.toInt(data);
+    }
+  }
+
+  @Test(timeout=30000)
+  public void testStateMachineRecovery() throws Exception {
+    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
+    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
+
+    // Step 1 - kill
+    Procedure proc = new TestStateMachineProcedure();
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 1 exec && Step 2 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 2 exec && step 3 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 3 exec
+    restart();
+    waitProcedure(procId);
+    assertTrue(procExecutor.isRunning());
+
+    // The procedure is completed
+    ProcedureResult result = procExecutor.getResult(procId);
+    ProcedureTestingUtility.assertProcNotFailed(result);
+    assertEquals(15, Bytes.toInt(result.getResult()));
+  }
+
+  @Test(timeout=30000)
+  public void testStateMachineRollbackRecovery() throws Exception {
+    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
+    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
+
+    // Step 1 - kill
+    Procedure proc = new TestStateMachineProcedure();
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 1 exec && Step 2 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 2 exec && step 3 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 3 exec - rollback step 3 - kill
+    procSleepInterval = 2500;
+    restart();
+    assertTrue(procExecutor.abort(procId));
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Rollback step 3 - rollback step 2 - kill
+    restart();
+    waitProcedure(procId);
+    assertFalse(procExecutor.isRunning());
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+
+    // Rollback step 2 - step 1 - kill
+    restart();
+    waitProcedure(procId);
+    assertFalse(procExecutor.isRunning());
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+
+    // Rollback step 1 - complete
+    restart();
+    waitProcedure(procId);
+    assertTrue(procExecutor.isRunning());
+
+    // The procedure is completed
+    ProcedureResult result = procExecutor.getResult(procId);
+    ProcedureTestingUtility.assertIsAbortException(result);
+  }
+
+  private void waitProcedure(final long procId) {
+    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
+    dumpLogDirState();
+  }
+
+  private void dumpLogDirState() {
+    try {
+      FileStatus[] files = fs.listStatus(logDir);
+      if (files != null && files.length > 0) {
+        for (FileStatus file: files) {
+          assertTrue(file.toString(), file.isFile());
+          LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
+        }
+      } else {
+        LOG.debug("no files under: " + logDir);
+      }
+    } catch (IOException e) {
+      LOG.warn("Unable to dump " + logDir, e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
new file mode 100644
index 0000000..88645ed
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, LargeTests.class})
+public class TestProcedureReplayOrder {
+  private static final Log LOG = LogFactory.getLog(TestProcedureReplayOrder.class);
+
+  private static final Procedure NULL_PROC = null;
+
+  private ProcedureExecutor<Void> procExecutor;
+  private TestProcedureEnv procEnv;
+  private ProcedureStore procStore;
+
+  private HBaseCommonTestingUtility htu;
+  private FileSystem fs;
+  private Path testDir;
+  private Path logDir;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 10);
+
+    testDir = htu.getDataTestDir();
+    fs = testDir.getFileSystem(htu.getConfiguration());
+    assertTrue(testDir.depth() > 1);
+
+    logDir = new Path(testDir, "proc-logs");
+    procEnv = new TestProcedureEnv();
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+    procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
+    procStore.start(24);
+    procExecutor.start(1);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    procExecutor.stop();
+    procStore.stop(false);
+    fs.delete(logDir, true);
+  }
+
+  @Test(timeout=90000)
+  public void testSingleStepReplyOrder() throws Exception {
+    // avoid the procedure to be runnable
+    procEnv.setAcquireLock(false);
+
+    // submit the procedures
+    submitProcedures(16, 25, TestSingleStepProcedure.class);
+
+    // restart the executor and allow the procedures to run
+    ProcedureTestingUtility.restart(procExecutor, new Runnable() {
+      @Override
+      public void run() {
+        procEnv.setAcquireLock(true);
+      }
+    });
+
+    // wait the execution of all the procedures and
+    // assert that the execution order was sorted by procId
+    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
+    procEnv.assertSortedExecList();
+
+    // TODO: FIXME: This should be revisited
+  }
+
+  @Ignore
+  @Test(timeout=90000)
+  public void testMultiStepReplyOrder() throws Exception {
+    // avoid the procedure to be runnable
+    procEnv.setAcquireLock(false);
+
+    // submit the procedures
+    submitProcedures(16, 10, TestTwoStepProcedure.class);
+
+    // restart the executor and allow the procedures to run
+    ProcedureTestingUtility.restart(procExecutor, new Runnable() {
+      @Override
+      public void run() {
+        procEnv.setAcquireLock(true);
+      }
+    });
+
+    fail("TODO: FIXME: NOT IMPLEMENT REPLAY ORDER");
+  }
+
+  private void submitProcedures(final int nthreads, final int nprocPerThread,
+      final Class<?> procClazz) throws Exception {
+    Thread[] submitThreads = new Thread[nthreads];
+    for (int i = 0; i < submitThreads.length; ++i) {
+      submitThreads[i] = new Thread() {
+        @Override
+        public void run() {
+          for (int i = 0; i < nprocPerThread; ++i) {
+            try {
+              procExecutor.submitProcedure((Procedure)procClazz.newInstance());
+            } catch (InstantiationException|IllegalAccessException e) {
+              LOG.error("unable to instantiate the procedure", e);
+              fail("failure during the proc.newInstance(): " + e.getMessage());
+            }
+          }
+        }
+      };
+    }
+
+    for (int i = 0; i < submitThreads.length; ++i) {
+      submitThreads[i].start();
+    }
+
+    for (int i = 0; i < submitThreads.length; ++i) {
+      submitThreads[i].join();
+    }
+  }
+
+  private static class TestProcedureEnv {
+    private ArrayList<Long> execList = new ArrayList<Long>();
+    private boolean acquireLock = true;
+
+    public void setAcquireLock(boolean acquireLock) {
+      this.acquireLock = acquireLock;
+    }
+
+    public boolean canAcquireLock() {
+      return acquireLock;
+    }
+
+    public void addToExecList(final Procedure proc) {
+      execList.add(proc.getProcId());
+    }
+
+    public ArrayList<Long> getExecList() {
+      return execList;
+    }
+
+    public void assertSortedExecList() {
+      LOG.debug("EXEC LIST: " + execList);
+      for (int i = 1; i < execList.size(); ++i) {
+        assertTrue("exec list not sorted: " + execList.get(i-1) + " >= " + execList.get(i),
+          execList.get(i-1) < execList.get(i));
+      }
+    }
+  }
+
+  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcedureEnv> {
+    public TestSingleStepProcedure() { }
+
+    @Override
+    protected Procedure[] execute(TestProcedureEnv env) {
+      LOG.debug("execute procedure " + this);
+      env.addToExecList(this);
+      return null;
+    }
+
+    protected boolean acquireLock(final TestProcedureEnv env) {
+      return env.canAcquireLock();
+    }
+
+    @Override
+    protected void rollback(TestProcedureEnv env) { }
+
+    @Override
+    protected boolean abort(TestProcedureEnv env) { return true; }
+  }
+
+  public static class TestTwoStepProcedure extends SequentialProcedure<TestProcedureEnv> {
+    public TestTwoStepProcedure() { }
+
+    @Override
+    protected Procedure[] execute(TestProcedureEnv env) {
+      LOG.debug("execute procedure " + this);
+      env.addToExecList(this);
+      return new Procedure[] { new TestSingleStepProcedure() };
+    }
+
+    protected boolean acquireLock(final TestProcedureEnv env) {
+      return true;
+    }
+
+    @Override
+    protected void rollback(TestProcedureEnv env) { }
+
+    @Override
+    protected boolean abort(TestProcedureEnv env) { return true; }
+  }
+}
\ No newline at end of file


Mime
View raw message