hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [05/24] hbase git commit: HBASE-13202 Procedure v2 - core framework
Date Fri, 10 Apr 2015 07:51:40 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
new file mode 100644
index 0000000..0aebd5a
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Once a Procedure completes the ProcedureExecutor takes all the useful
+ * information of the procedure (e.g. exception/result) and creates a ProcedureResult.
+ * The user of the Procedure framework will get the procedure result with
+ * procedureExecutor.getResult(procId)
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ProcedureResult {
+  private final RemoteProcedureException exception;
+  private final long lastUpdate;
+  private final long startTime;
+  private final byte[] result;
+
+  private long clientAckTime = -1;
+
+  public ProcedureResult(final long startTime, final long lastUpdate,
+      final RemoteProcedureException exception) {
+    this.lastUpdate = lastUpdate;
+    this.startTime = startTime;
+    this.exception = exception;
+    this.result = null;
+  }
+
+  public ProcedureResult(final long startTime, final long lastUpdate, final byte[] result) {
+    this.lastUpdate = lastUpdate;
+    this.startTime = startTime;
+    this.exception = null;
+    this.result = result;
+  }
+
+  public boolean isFailed() {
+    return exception != null;
+  }
+
+  public RemoteProcedureException getException() {
+    return exception;
+  }
+
+  public boolean hasResultData() {
+    return result != null;
+  }
+
+  public byte[] getResult() {
+    return result;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getLastUpdate() {
+    return lastUpdate;
+  }
+
+  public long executionTime() {
+    return lastUpdate - startTime;
+  }
+
+  public boolean hasClientAckTime() {
+    return clientAckTime > 0;
+  }
+
+  public long getClientAckTime() {
+    return clientAckTime;
+  }
+
+  @InterfaceAudience.Private
+  protected void setClientAckTime(final long timestamp) {
+    this.clientAckTime = timestamp;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
new file mode 100644
index 0000000..2d7ba39
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Keep track of the runnable procedures
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface ProcedureRunnableSet {
+  /**
+   * Inserts the specified element at the front of this queue.
+   * @param proc the Procedure to add
+   */
+  void addFront(Procedure proc);
+
+  /**
+   * Inserts the specified element at the end of this queue.
+   * @param proc the Procedure to add
+   */
+  void addBack(Procedure proc);
+
+  /**
+   * The procedure can't run at the moment.
+   * add it back to the queue, giving priority to someone else.
+   * @param proc the Procedure to add back to the list
+   */
+  void yield(Procedure proc);
+
+  /**
+   * The procedure in execution completed.
+   * This can be implemented to perform cleanups.
+   * @param proc the Procedure that completed the execution.
+   */
+  void completionCleanup(Procedure proc);
+
+  /**
+   * Fetch one Procedure from the queue
+   * @return the Procedure ID to execute, or null if nothing present.
+   */
+  Long poll();
+
+  /**
+   * In case the class is blocking on poll() waiting for items to be added,
+   * this method should awake poll() and poll() should return.
+   */
+  void signalAll();
+
+  /**
+   * Returns the number of elements in this collection.
+   * @return the number of elements in this collection.
+   */
+  int size();
+
+  /**
+   * Removes all of the elements from this collection.
+   */
+  void clear();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
new file mode 100644
index 0000000..7b17fb2
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Simple runqueue for the procedures
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
+  private final Deque<Long> runnables = new ArrayDeque<Long>();
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition waitCond = lock.newCondition();
+
+  @Override
+  public void addFront(final Procedure proc) {
+    lock.lock();
+    try {
+      runnables.addFirst(proc.getProcId());
+      waitCond.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void addBack(final Procedure proc) {
+    lock.lock();
+    try {
+      runnables.addLast(proc.getProcId());
+      waitCond.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void yield(final Procedure proc) {
+    addBack(proc);
+  }
+
+  @Override
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
+  public Long poll() {
+    lock.lock();
+    try {
+      if (runnables.isEmpty()) {
+        waitCond.await();
+        if (!runnables.isEmpty()) {
+          return runnables.pop();
+        }
+      } else {
+        return runnables.pop();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return null;
+    } finally {
+      lock.unlock();
+    }
+    return null;
+  }
+
+  @Override
+  public void signalAll() {
+    lock.lock();
+    try {
+      waitCond.signalAll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void clear() {
+    lock.lock();
+    try {
+      runnables.clear();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public int size() {
+    lock.lock();
+    try {
+      return runnables.size();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void completionCleanup(Procedure proc) {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java
new file mode 100644
index 0000000..177ff5b
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+// TODO: Not used yet
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ProcedureYieldException extends ProcedureException {
+  /** default constructor */
+  public ProcedureYieldException() {
+    super();
+  }
+
+  /**
+   * Constructor
+   * @param s message
+   */
+  public ProcedureYieldException(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
new file mode 100644
index 0000000..6be512d
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * A RemoteProcedureException is an exception from another thread or process.
+ * <p>
+ * RemoteProcedureExceptions are sent to 'remote' peers to signal an abort in the face of failures.
+ * When serialized for transmission we encode using Protobufs to ensure version compatibility.
+ * <p>
+ * RemoteProcedureException exceptions contain a Throwable as its cause.
+ * This can be a "regular" exception generated locally or a ProxyThrowable that is a representation
+ * of the original exception created on original 'remote' source.  These ProxyThrowables have their
+ * their stacks traces and messages overridden to reflect the original 'remote' exception.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@SuppressWarnings("serial")
+public class RemoteProcedureException extends ProcedureException {
+
+  /**
+   * Name of the throwable's source such as a host or thread name.  Must be non-null.
+   */
+  private final String source;
+
+  /**
+   * Create a new RemoteProcedureException that can be serialized.
+   * It is assumed that this came form a local source.
+   * @param source
+   * @param cause
+   */
+  public RemoteProcedureException(String source, Throwable cause) {
+    super(cause);
+    assert source != null;
+    assert cause != null;
+    this.source = source;
+  }
+
+  public String getSource() {
+    return source;
+  }
+
+  public IOException unwrapRemoteException() {
+    if (getCause() instanceof RemoteException) {
+      return ((RemoteException)getCause()).unwrapRemoteException();
+    }
+    if (getCause() instanceof IOException) {
+      return (IOException)getCause();
+    }
+    return new IOException(getCause());
+  }
+
+  @Override
+  public String toString() {
+    String className = getCause().getClass().getName();
+    return className + " via " + getSource() + ":" + getLocalizedMessage();
+  }
+
+  /**
+   * Converts a RemoteProcedureException to an array of bytes.
+   * @param source the name of the external exception source
+   * @param t the "local" external exception (local)
+   * @return protobuf serialized version of RemoteProcedureException
+   */
+  public static byte[] serialize(String source, Throwable t) {
+    return toProto(source, t).toByteArray();
+  }
+
+  /**
+   * Takes a series of bytes and tries to generate an RemoteProcedureException instance for it.
+   * @param bytes
+   * @return the ForeignExcpetion instance
+   * @throws InvalidProtocolBufferException if there was deserialization problem this is thrown.
+   */
+  public static RemoteProcedureException deserialize(byte[] bytes)
+      throws InvalidProtocolBufferException {
+    return fromProto(ForeignExceptionMessage.parseFrom(bytes));
+  }
+
+  public ForeignExceptionMessage convert() {
+    return ForeignExceptionUtil.toProtoForeignException(getSource(), getCause());
+  }
+
+  public static ForeignExceptionMessage toProto(String source, Throwable t) {
+    return ForeignExceptionUtil.toProtoForeignException(source, t);
+  }
+
+  public static RemoteProcedureException fromProto(final ForeignExceptionMessage eem) {
+    return new RemoteProcedureException(eem.getSource(), ForeignExceptionUtil.toIOException(eem));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java
new file mode 100644
index 0000000..bc1af20
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
+
+/**
+ * Internal state of the ProcedureExecutor that describes the state of a "Root Procedure".
+ * A "Root Procedure" is a Procedure without parent, each subprocedure will be
+ * added to the "Root Procedure" stack (or rollback-stack).
+ *
+ * RootProcedureState is used and managed only by the ProcedureExecutor.
+ *    Long rootProcId = getRootProcedureId(proc);
+ *    rollbackStack.get(rootProcId).acquire(proc)
+ *    rollbackStack.get(rootProcId).release(proc)
+ *    ...
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class RootProcedureState {
+  private static final Log LOG = LogFactory.getLog(RootProcedureState.class);
+
+  private enum State {
+    RUNNING,         // The Procedure is running or ready to run
+    FAILED,          // The Procedure failed, waiting for the rollback executing
+    ROLLINGBACK,     // The Procedure failed and the execution was rolledback
+  }
+
+  private ArrayList<Procedure> subprocedures = null;
+  private State state = State.RUNNING;
+  private int running = 0;
+
+  public synchronized boolean isFailed() {
+    switch (state) {
+      case ROLLINGBACK:
+      case FAILED:
+        return true;
+      default:
+        break;
+    }
+    return false;
+  }
+
+  public synchronized boolean isRollingback() {
+    return state == State.ROLLINGBACK;
+  }
+
+  /**
+   * Called by the ProcedureExecutor to mark rollback execution
+   */
+  protected synchronized boolean setRollback() {
+    if (running == 0 && state == State.FAILED) {
+      state = State.ROLLINGBACK;
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Called by the ProcedureExecutor to mark rollback execution
+   */
+  protected synchronized void unsetRollback() {
+    assert state == State.ROLLINGBACK;
+    state = State.FAILED;
+  }
+
+  protected synchronized List<Procedure> getSubprocedures() {
+    return subprocedures;
+  }
+
+  protected synchronized RemoteProcedureException getException() {
+    if (subprocedures != null) {
+      for (Procedure proc: subprocedures) {
+        if (proc.hasException()) {
+          return proc.getException();
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Called by the ProcedureExecutor to mark the procedure step as running.
+   */
+  protected synchronized boolean acquire(final Procedure proc) {
+    if (state != State.RUNNING) return false;
+
+    running++;
+    return true;
+  }
+
+  /**
+   * Called by the ProcedureExecutor to mark the procedure step as finished.
+   */
+  protected synchronized void release(final Procedure proc) {
+    running--;
+  }
+
+  protected synchronized void abort() {
+    if (state == State.RUNNING) {
+      state = State.FAILED;
+    }
+  }
+
+  /**
+   * Called by the ProcedureExecutor after the procedure step is completed,
+   * to add the step to the rollback list (or procedure stack)
+   */
+  protected synchronized void addRollbackStep(final Procedure proc) {
+    if (proc.isFailed()) {
+      state = State.FAILED;
+    }
+    if (subprocedures == null) {
+      subprocedures = new ArrayList<Procedure>();
+    }
+    proc.addStackIndex(subprocedures.size());
+    subprocedures.add(proc);
+  }
+
+  /**
+   * Called on store load by the ProcedureExecutor to load part of the stack.
+   *
+   * Each procedure has its own stack-positions. Which means we have to write
+   * to the store only the Procedure we executed, and nothing else.
+   * on load we recreate the full stack by aggregating each procedure stack-positions.
+   */
+  protected synchronized void loadStack(final Procedure proc) {
+    int[] stackIndexes = proc.getStackIndexes();
+    if (stackIndexes != null) {
+      if (subprocedures == null) {
+        subprocedures = new ArrayList<Procedure>();
+      }
+      int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocedures.size();
+      if (diff > 0) {
+        subprocedures.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]);
+        while (diff-- > 0) subprocedures.add(null);
+      }
+      for (int i = 0; i < stackIndexes.length; ++i) {
+        subprocedures.set(stackIndexes[i], proc);
+      }
+    }
+    if (proc.getState() == ProcedureState.ROLLEDBACK) {
+      state = State.ROLLINGBACK;
+    } else if (proc.isFailed()) {
+      state = State.FAILED;
+    }
+  }
+
+  /**
+   * Called on store load by the ProcedureExecutor to validate the procedure stack.
+   */
+  protected synchronized boolean isValid() {
+    if (subprocedures != null) {
+      for (Procedure proc: subprocedures) {
+        if (proc == null) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
new file mode 100644
index 0000000..b4b35f2
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.SequentialProcedureData;
+
+/**
+ * A SequentialProcedure describes one step in a procedure chain.
+ *   -> Step 1 -> Step 2 -> Step 3
+ *
+ * The main difference from a base Procedure is that the execute() of a
+ * SequentialProcedure will be called only once, there will be no second
+ * execute() call once the child are finished. which means once the child
+ * of a SequentialProcedure are completed the SequentialProcedure is completed too.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class SequentialProcedure<TEnvironment> extends Procedure<TEnvironment> {
+  private boolean executed = false;
+
+  @Override
+  protected Procedure[] doExecute(final TEnvironment env)
+      throws ProcedureYieldException {
+    updateTimestamp();
+    try {
+      Procedure[] children = !executed ? execute(env) : null;
+      executed = !executed;
+      return children;
+    } finally {
+      updateTimestamp();
+    }
+  }
+
+  @Override
+  protected void doRollback(final TEnvironment env) throws IOException {
+    updateTimestamp();
+    if (executed) {
+      try {
+        rollback(env);
+        executed = !executed;
+      } finally {
+        updateTimestamp();
+      }
+    }
+  }
+
+  @Override
+  protected void serializeStateData(final OutputStream stream) throws IOException {
+    SequentialProcedureData.Builder data = SequentialProcedureData.newBuilder();
+    data.setExecuted(executed);
+    data.build().writeDelimitedTo(stream);
+  }
+
+  @Override
+  protected void deserializeStateData(final InputStream stream) throws IOException {
+    SequentialProcedureData data = SequentialProcedureData.parseDelimitedFrom(stream);
+    executed = data.getExecuted();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
new file mode 100644
index 0000000..eab96e4
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.StateMachineProcedureData;
+
+/**
+ * Procedure described by a series of steps.
+ *
+ * The procedure implementor must have an enum of 'states', describing
+ * the various step of the procedure.
+ * Once the procedure is running, the procedure-framework will call executeFromState()
+ * using the 'state' provided by the user. The first call to executeFromState()
+ * will be performed with 'state = null'. The implementor can jump between
+ * states using setNextState(MyStateEnum.ordinal()).
+ * The rollback will call rollbackState() for each state that was executed, in reverse order.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class StateMachineProcedure<TEnvironment, TState>
+    extends Procedure<TEnvironment> {
+  private int stateCount = 0;
+  private int[] states = null;
+
+  protected enum Flow {
+    HAS_MORE_STATE,
+    NO_MORE_STATE,
+  }
+
+  /**
+   * called to perform a single step of the specified 'state' of the procedure
+   * @param state state to execute
+   * @return Flow.NO_MORE_STATE if the procedure is completed,
+   *         Flow.HAS_MORE_STATE if there is another step.
+   */
+  protected abstract Flow executeFromState(TEnvironment env, TState state)
+    throws ProcedureYieldException;
+
+  /**
+   * called to perform the rollback of the specified state
+   * @param state state to rollback
+   * @throws IOException temporary failure, the rollback will retry later
+   */
+  protected abstract void rollbackState(TEnvironment env, TState state)
+    throws IOException;
+
+  /**
+   * Convert an ordinal (or state id) to an Enum (or more descriptive) state object.
+   * @param stateId the ordinal() of the state enum (or state id)
+   * @return the state enum object
+   */
+  protected abstract TState getState(int stateId);
+
+  /**
+   * Convert the Enum (or more descriptive) state object to an ordinal (or state id).
+   * @param state the state enum object
+   * @return stateId the ordinal() of the state enum (or state id)
+   */
+  protected abstract int getStateId(TState state);
+
+  /**
+   * Return the initial state object that will be used for the first call to executeFromState().
+   * @return the initial state enum object
+   */
+  protected abstract TState getInitialState();
+
+  /**
+   * Set the next state for the procedure.
+   * @param state the state enum object
+   */
+  protected void setNextState(final TState state) {
+    setNextState(getStateId(state));
+  }
+
+  @Override
+  protected Procedure[] execute(final TEnvironment env)
+      throws ProcedureYieldException {
+    updateTimestamp();
+    try {
+      TState state = stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
+      if (stateCount == 0) {
+        setNextState(getStateId(state));
+      }
+      if (executeFromState(env, state) == Flow.NO_MORE_STATE) {
+        // completed
+        return null;
+      }
+      return (isWaiting() || isFailed()) ? null : new Procedure[] {this};
+    } finally {
+      updateTimestamp();
+    }
+  }
+
+  @Override
+  protected void rollback(final TEnvironment env) throws IOException {
+    try {
+      updateTimestamp();
+      rollbackState(env, stateCount > 0 ? getState(states[stateCount-1]) : getInitialState());
+      stateCount--;
+    } finally {
+      updateTimestamp();
+    }
+  }
+
+  /**
+   * Set the next state for the procedure.
+   * @param stateId the ordinal() of the state enum (or state id)
+   */
+  private void setNextState(final int stateId) {
+    if (states == null || states.length == stateCount) {
+      int newCapacity = stateCount + 8;
+      if (states != null) {
+        states = Arrays.copyOf(states, newCapacity);
+      } else {
+        states = new int[newCapacity];
+      }
+    }
+    states[stateCount++] = stateId;
+  }
+
+  @Override
+  protected void serializeStateData(final OutputStream stream) throws IOException {
+    StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder();
+    for (int i = 0; i < stateCount; ++i) {
+      data.addState(states[i]);
+    }
+    data.build().writeDelimitedTo(stream);
+  }
+
+  @Override
+  protected void deserializeStateData(final InputStream stream) throws IOException {
+    StateMachineProcedureData data = StateMachineProcedureData.parseDelimitedFrom(stream);
+    stateCount = data.getStateCount();
+    if (stateCount > 0) {
+      states = new int[stateCount];
+      for (int i = 0; i < stateCount; ++i) {
+        states[i] = data.getState(i);
+      }
+    } else {
+      states = null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TwoPhaseProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TwoPhaseProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TwoPhaseProcedure.java
new file mode 100644
index 0000000..cd6b0a7
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TwoPhaseProcedure.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class TwoPhaseProcedure<TEnvironment> extends Procedure<TEnvironment> {
+  // TODO (e.g. used by ACLs/VisibilityTags updates)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
new file mode 100644
index 0000000..0d1c050
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+
+/**
+ * The ProcedureStore is used by the executor to persist the state of each procedure execution.
+ * This allows to resume the execution of pending/in-progress procedures in case
+ * of machine failure or service shutdown.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ProcedureStore {
+  /**
+   * Store listener interface.
+   * The main process should register a listener and respond to the store events.
+   */
+  public interface ProcedureStoreListener {
+    /**
+     * triggered when the store is not able to write out data.
+     * the main process should abort.
+     */
+    void abortProcess();
+  }
+
+  /**
+   * Add the listener to the notification list.
+   * @param listener The AssignmentListener to register
+   */
+  void registerListener(ProcedureStoreListener listener);
+
+  /**
+   * Remove the listener from the notification list.
+   * @param listener The AssignmentListener to unregister
+   * @return true if the listner was in the list and it was removed, otherwise false.
+   */
+  boolean unregisterListener(ProcedureStoreListener listener);
+
+  /**
+   * Start/Open the procedure store
+   * @param numThreads
+   */
+  void start(int numThreads) throws IOException;
+
+  /**
+   * Stop/Close the procedure store
+   * @param abort true if the stop is an abort
+   */
+  void stop(boolean abort);
+
+  /**
+   * @return true if the store is running, otherwise false.
+   */
+  boolean isRunning();
+
+  /**
+   * @return the number of threads/slots passed to start()
+   */
+  int getNumThreads();
+
+  /**
+   * Acquire the lease for the procedure store.
+   */
+  void recoverLease() throws IOException;
+
+  /**
+   * Load the Procedures in the store.
+   * @return the set of procedures present in the store
+   */
+  Iterator<Procedure> load() throws IOException;
+
+  /**
+   * When a procedure is submitted to the executor insert(proc, null) will be called.
+   * 'proc' has a 'RUNNABLE' state and the initial information required to start up.
+   *
+   * When a procedure is executed and it returns children insert(proc, subprocs) will be called.
+   * 'proc' has a 'WAITING' state and an update state.
+   * 'subprocs' are the children in 'RUNNABLE' state with the initial information.
+   *
+   * @param proc the procedure to serialize and write to the store.
+   * @param subprocs the newly created child of the proc.
+   */
+  void insert(Procedure proc, Procedure[] subprocs);
+
+  /**
+   * The specified procedure was executed,
+   * and the new state should be written to the store.
+   * @param proc the procedure to serialize and write to the store.
+   */
+  void update(Procedure proc);
+
+  /**
+   * The specified procId was removed from the executor,
+   * due to completion, abort or failure.
+   * The store implementor should remove all the information about the specified procId.
+   * @param procId the ID of the procedure to remove.
+   */
+  void delete(long procId);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
new file mode 100644
index 0000000..4e4653a
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
+
+/**
+ * Keeps track of live procedures.
+ *
+ * It can be used by the ProcedureStore to identify which procedures are already
+ * deleted/completed to avoid the deserialization step on restart.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ProcedureStoreTracker {
+  private final TreeMap<Long, BitSetNode> map = new TreeMap<Long, BitSetNode>();
+
+  private boolean keepDeletes = false;
+  private boolean partial = false;
+
+  public enum DeleteState { YES, NO, MAYBE }
+
+  public static class BitSetNode {
+    private final static long WORD_MASK = 0xffffffffffffffffL;
+    private final static int ADDRESS_BITS_PER_WORD = 6;
+    private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
+    private final static int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD;
+
+    private long[] updated;
+    private long[] deleted;
+    private long start;
+
+    public void dump() {
+      System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(),
+        getMinProcId(), getMaxProcId());
+      System.out.println("Update:");
+      for (int i = 0; i < updated.length; ++i) {
+        for (int j = 0; j < BITS_PER_WORD; ++j) {
+          System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0");
+        }
+        System.out.println(" " + i);
+      }
+      System.out.println();
+      System.out.println("Delete:");
+      for (int i = 0; i < deleted.length; ++i) {
+        for (int j = 0; j < BITS_PER_WORD; ++j) {
+          System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0");
+        }
+        System.out.println(" " + i);
+      }
+      System.out.println();
+    }
+
+    public BitSetNode(final long procId, final boolean partial) {
+      start = alignDown(procId);
+
+      int count = 2;
+      updated = new long[count];
+      deleted = new long[count];
+      for (int i = 0; i < count; ++i) {
+        updated[i] = 0;
+        deleted[i] = partial ? 0 : WORD_MASK;
+      }
+
+      updateState(procId, false);
+    }
+
+    protected BitSetNode(final long start, final long[] updated, final long[] deleted) {
+      this.start = start;
+      this.updated = updated;
+      this.deleted = deleted;
+    }
+
+    public void update(final long procId) {
+      updateState(procId, false);
+    }
+
+    public void delete(final long procId) {
+      updateState(procId, true);
+    }
+
+    public Long getStart() {
+      return start;
+    }
+
+    public Long getEnd() {
+      return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1;
+    }
+
+    public boolean contains(final long procId) {
+      return start <= procId && procId <= getEnd();
+    }
+
+    public DeleteState isDeleted(final long procId) {
+      int bitmapIndex = getBitmapIndex(procId);
+      int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
+      if (wordIndex >= deleted.length) {
+        return DeleteState.MAYBE;
+      }
+      return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO;
+    }
+
+    private boolean isUpdated(final long procId) {
+      int bitmapIndex = getBitmapIndex(procId);
+      int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
+      if (wordIndex >= updated.length) {
+        return false;
+      }
+      return (updated[wordIndex] & (1L << bitmapIndex)) != 0;
+    }
+
+    public boolean isUpdated() {
+      // TODO: cache the value
+      for (int i = 0; i < updated.length; ++i) {
+        long deleteMask = ~deleted[i];
+        if ((updated[i] & deleteMask) != (WORD_MASK & deleteMask)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    public boolean isEmpty() {
+      // TODO: cache the value
+      for (int i = 0; i < deleted.length; ++i) {
+        if (deleted[i] != WORD_MASK) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    public void resetUpdates() {
+      for (int i = 0; i < updated.length; ++i) {
+        updated[i] = 0;
+      }
+    }
+
+    public void undeleteAll() {
+      for (int i = 0; i < updated.length; ++i) {
+        deleted[i] = 0;
+      }
+    }
+
+    public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
+      ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
+        ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
+      builder.setStartId(start);
+      for (int i = 0; i < updated.length; ++i) {
+        builder.addUpdated(updated[i]);
+        builder.addDeleted(deleted[i]);
+      }
+      return builder.build();
+    }
+
+    public static BitSetNode convert(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) {
+      long start = data.getStartId();
+      int size = data.getUpdatedCount();
+      long[] updated = new long[size];
+      long[] deleted = new long[size];
+      for (int i = 0; i < size; ++i) {
+        updated[i] = data.getUpdated(i);
+        deleted[i] = data.getDeleted(i);
+      }
+      return new BitSetNode(start, updated, deleted);
+    }
+
+    // ========================================================================
+    //  Grow/Merge Helpers
+    // ========================================================================
+    public boolean canGrow(final long procId) {
+      return (procId - start) < MAX_NODE_SIZE;
+    }
+
+    public boolean canMerge(final BitSetNode rightNode) {
+      return (start + rightNode.getEnd()) < MAX_NODE_SIZE;
+    }
+
+    public void grow(final long procId) {
+      int delta, offset;
+
+      if (procId < start) {
+        // add to head
+        long newStart = alignDown(procId);
+        delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD;
+        offset = delta;
+      } else {
+        // Add to tail
+        long newEnd = alignUp(procId + 1);
+        delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD;
+        offset = 0;
+      }
+
+      long[] newBitmap;
+      int oldSize = updated.length;
+
+      newBitmap = new long[oldSize + delta];
+      System.arraycopy(updated, 0, newBitmap, offset, oldSize);
+      updated = newBitmap;
+
+      newBitmap = new long[deleted.length + delta];
+      System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
+      deleted = newBitmap;
+
+      for (int i = 0; i < delta; ++i) {
+        updated[oldSize + i] = 0;
+        deleted[oldSize + i] = WORD_MASK;
+      }
+    }
+
+    public void merge(final BitSetNode rightNode) {
+      int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD;
+
+      long[] newBitmap;
+      int oldSize = updated.length;
+      int newSize = (delta - rightNode.updated.length);
+      int offset = oldSize + newSize;
+
+      newBitmap = new long[oldSize + delta];
+      System.arraycopy(updated, 0, newBitmap, 0, oldSize);
+      System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length);
+      updated = newBitmap;
+
+      newBitmap = new long[oldSize + delta];
+      System.arraycopy(deleted, 0, newBitmap, 0, oldSize);
+      System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length);
+      deleted = newBitmap;
+
+      for (int i = 0; i < newSize; ++i) {
+        updated[offset + i] = 0;
+        deleted[offset + i] = WORD_MASK;
+      }
+    }
+
+    // ========================================================================
+    //  Min/Max Helpers
+    // ========================================================================
+    public long getMinProcId() {
+      long minProcId = start;
+      for (int i = 0; i < deleted.length; ++i) {
+        if (deleted[i] == 0) {
+          return(minProcId);
+        }
+
+        if (deleted[i] != WORD_MASK) {
+          for (int j = 0; j < BITS_PER_WORD; ++j) {
+            if ((deleted[i] & (1L << j)) != 0) {
+              return minProcId + j;
+            }
+          }
+        }
+
+        minProcId += BITS_PER_WORD;
+      }
+      return minProcId;
+    }
+
+    public long getMaxProcId() {
+      long maxProcId = getEnd();
+      for (int i = deleted.length - 1; i >= 0; --i) {
+        if (deleted[i] == 0) {
+          return maxProcId;
+        }
+
+        if (deleted[i] != WORD_MASK) {
+          for (int j = BITS_PER_WORD - 1; j >= 0; --j) {
+            if ((deleted[i] & (1L << j)) == 0) {
+              return maxProcId - (BITS_PER_WORD - 1 - j);
+            }
+          }
+        }
+        maxProcId -= BITS_PER_WORD;
+      }
+      return maxProcId;
+    }
+
+    // ========================================================================
+    //  Bitmap Helpers
+    // ========================================================================
+    private int getBitmapIndex(final long procId) {
+      return (int)(procId - start);
+    }
+
+    private void updateState(final long procId, final boolean isDeleted) {
+      int bitmapIndex = getBitmapIndex(procId);
+      int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
+      long value = (1L << bitmapIndex);
+
+      if (isDeleted) {
+        updated[wordIndex] |= value;
+        deleted[wordIndex] |= value;
+      } else {
+        updated[wordIndex] |= value;
+        deleted[wordIndex] &= ~value;
+      }
+    }
+
+    // ========================================================================
+    //  Helpers
+    // ========================================================================
+    private static long alignUp(final long x) {
+      return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
+    }
+
+    private static long alignDown(final long x) {
+      return x & -BITS_PER_WORD;
+    }
+  }
+
+  public void insert(final Procedure proc, final Procedure[] subprocs) {
+    insert(proc.getProcId());
+    if (subprocs != null) {
+      for (int i = 0; i < subprocs.length; ++i) {
+        insert(subprocs[i].getProcId());
+      }
+    }
+  }
+
+  public void update(final Procedure proc) {
+    update(proc.getProcId());
+  }
+
+  public void insert(long procId) {
+    BitSetNode node = getOrCreateNode(procId);
+    node.update(procId);
+  }
+
+  public void update(long procId) {
+    Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
+    assert entry != null : "expected node to update procId=" + procId;
+
+    BitSetNode node = entry.getValue();
+    assert node.contains(procId);
+    node.update(procId);
+  }
+
+  public void delete(long procId) {
+    Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
+    assert entry != null : "expected node to delete procId=" + procId;
+
+    BitSetNode node = entry.getValue();
+    assert node.contains(procId) : "expected procId in the node";
+    node.delete(procId);
+
+    if (!keepDeletes && node.isEmpty()) {
+      // TODO: RESET if (map.size() == 1)
+      map.remove(entry.getKey());
+    }
+  }
+
+  @InterfaceAudience.Private
+  public void setDeleted(final long procId, final boolean isDeleted) {
+    BitSetNode node = getOrCreateNode(procId);
+    node.updateState(procId, isDeleted);
+  }
+
+  public void clear() {
+    this.map.clear();
+  }
+
+  public DeleteState isDeleted(long procId) {
+    Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
+    if (entry != null) {
+      BitSetNode node = entry.getValue();
+      DeleteState state = node.isDeleted(procId);
+      return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state;
+    }
+    return partial ? DeleteState.MAYBE : DeleteState.YES;
+  }
+
+  public long getMinProcId() {
+    // TODO: Cache?
+    Map.Entry<Long, BitSetNode> entry = map.firstEntry();
+    return entry == null ? 0 : entry.getValue().getMinProcId();
+  }
+
+  public void setKeepDeletes(boolean keepDeletes) {
+    this.keepDeletes = keepDeletes;
+    if (!keepDeletes) {
+      Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator();
+      while (it.hasNext()) {
+        Map.Entry<Long, BitSetNode> entry = it.next();
+        if (entry.getValue().isEmpty()) {
+          it.remove();
+        }
+      }
+    }
+  }
+
+  public void setPartialFlag(boolean isPartial) {
+    this.partial = isPartial;
+  }
+
+  public boolean isEmpty() {
+    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+      if (entry.getValue().isEmpty() == false) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public boolean isUpdated() {
+    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+      if (entry.getValue().isUpdated() == false) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public void resetUpdates() {
+    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+      entry.getValue().resetUpdates();
+    }
+  }
+
+  public void undeleteAll() {
+    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+      entry.getValue().undeleteAll();
+    }
+  }
+
+  private BitSetNode getOrCreateNode(final long procId) {
+    // can procId fit in the left node?
+    BitSetNode leftNode = null;
+    boolean leftCanGrow = false;
+    Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId);
+    if (leftEntry != null) {
+      leftNode = leftEntry.getValue();
+      if (leftNode.contains(procId)) {
+        return leftNode;
+      }
+      leftCanGrow = leftNode.canGrow(procId);
+    }
+
+    BitSetNode rightNode = null;
+    boolean rightCanGrow = false;
+    Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId);
+    if (rightEntry != null) {
+      rightNode = rightEntry.getValue();
+      rightCanGrow = rightNode.canGrow(procId);
+      if (leftNode != null) {
+        if (leftNode.canMerge(rightNode)) {
+          // merge left and right node
+          return mergeNodes(leftNode, rightNode);
+        }
+
+        if (leftCanGrow && rightCanGrow) {
+          if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) {
+            // grow the left node
+            return growNode(leftNode, procId);
+          }
+          // grow the right node
+          return growNode(rightNode, procId);
+        }
+      }
+    }
+
+    // grow the left node
+    if (leftCanGrow) {
+      return growNode(leftNode, procId);
+    }
+
+    // grow the right node
+    if (rightCanGrow) {
+      return growNode(rightNode, procId);
+    }
+
+    // add new node
+    BitSetNode node = new BitSetNode(procId, partial);
+    map.put(node.getStart(), node);
+    return node;
+  }
+
+  private BitSetNode growNode(BitSetNode node, long procId) {
+    map.remove(node.getStart());
+    node.grow(procId);
+    map.put(node.getStart(), node);
+    return node;
+  }
+
+  private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) {
+    leftNode.merge(rightNode);
+    map.remove(rightNode.getStart());
+    return leftNode;
+  }
+
+  public void dump() {
+    System.out.println("map " + map.size());
+    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+      entry.getValue().dump();
+    }
+  }
+
+  public void writeTo(final OutputStream stream) throws IOException {
+    ProcedureProtos.ProcedureStoreTracker.Builder builder =
+        ProcedureProtos.ProcedureStoreTracker.newBuilder();
+    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+      builder.addNode(entry.getValue().convert());
+    }
+    builder.build().writeDelimitedTo(stream);
+  }
+
+  public void readFrom(final InputStream stream) throws IOException {
+    ProcedureProtos.ProcedureStoreTracker data =
+        ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
+    map.clear();
+    for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) {
+      BitSetNode node = BitSetNode.convert(protoNode);
+      map.put(node.getStart(), node);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
new file mode 100644
index 0000000..29db3bf
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown when a procedure WAL is corrupted
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class CorruptedWALProcedureStoreException extends HBaseIOException {
+  /** default constructor */
+  public CorruptedWALProcedureStoreException() {
+    super();
+  }
+
+  /**
+   * Constructor
+   * @param s message
+   */
+  public CorruptedWALProcedureStoreException(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
new file mode 100644
index 0000000..859b3cb
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
+
+/**
+ * Describes a WAL File
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
+  private static final Log LOG = LogFactory.getLog(ProcedureWALFile.class);
+
+  private ProcedureWALHeader header;
+  private FSDataInputStream stream;
+  private FileStatus logStatus;
+  private FileSystem fs;
+  private Path logFile;
+  private long startPos;
+
+  public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) {
+    this.fs = fs;
+    this.logStatus = logStatus;
+    this.logFile = logStatus.getPath();
+  }
+
+  public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header, long startPos) {
+    this.fs = fs;
+    this.logFile = logFile;
+    this.header = header;
+    this.startPos = startPos;
+  }
+
+  public void open() throws IOException {
+    if (stream == null) {
+      stream = fs.open(logFile);
+    }
+
+    if (header == null) {
+      header = ProcedureWALFormat.readHeader(stream);
+      startPos = stream.getPos();
+    } else {
+      stream.seek(startPos);
+    }
+  }
+
+  public ProcedureWALTrailer readTrailer() throws IOException {
+    try {
+      return ProcedureWALFormat.readTrailer(stream, startPos, logStatus.getLen());
+    } finally {
+      stream.seek(startPos);
+    }
+  }
+
+  public void readTracker(ProcedureStoreTracker tracker) throws IOException {
+    ProcedureWALTrailer trailer = readTrailer();
+    try {
+      stream.seek(trailer.getTrackerPos());
+      tracker.readFrom(stream);
+    } finally {
+      stream.seek(startPos);
+    }
+  }
+
+  public void close() {
+    if (stream == null) return;
+    try {
+      stream.close();
+    } catch (IOException e) {
+      LOG.warn("unable to close the wal file: " + logFile, e);
+    } finally {
+      stream = null;
+    }
+  }
+
+  public FSDataInputStream getStream() {
+    return stream;
+  }
+
+  public ProcedureWALHeader getHeader() {
+    return header;
+  }
+
+  public boolean isCompacted() {
+    return header.getType() == ProcedureWALFormat.LOG_TYPE_COMPACTED;
+  }
+
+  public long getLogId() {
+    return header.getLogId();
+  }
+
+  public long getSize() {
+    return logStatus.getLen();
+  }
+
+  public void removeFile() throws IOException {
+    close();
+    fs.delete(logFile, false);
+  }
+
+  @Override
+  public int compareTo(final ProcedureWALFile other) {
+    long diff = header.getLogId() - other.header.getLogId();
+    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof ProcedureWALFile)) return false;
+    return compareTo((ProcedureWALFile)o) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return logFile.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return logFile.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
new file mode 100644
index 0000000..17432ac
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Helper class that contains the WAL serialization utils.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class ProcedureWALFormat {
+  static final byte LOG_TYPE_STREAM = 0;
+  static final byte LOG_TYPE_COMPACTED = 1;
+  static final byte LOG_TYPE_MAX_VALID = 1;
+
+  static final byte HEADER_VERSION = 1;
+  static final byte TRAILER_VERSION = 1;
+  static final long HEADER_MAGIC = 0x31764c4157637250L;
+  static final long TRAILER_MAGIC = 0x50726357414c7631L;
+
+  @InterfaceAudience.Private
+  public static class InvalidWALDataException extends IOException {
+    public InvalidWALDataException(String s) {
+      super(s);
+    }
+
+    public InvalidWALDataException(Throwable t) {
+      super(t);
+    }
+  }
+
+  interface Loader {
+    void removeLog(ProcedureWALFile log);
+    void markCorruptedWAL(ProcedureWALFile log, IOException e);
+  }
+
+  private ProcedureWALFormat() {}
+
+  public static Iterator<Procedure> load(final Iterator<ProcedureWALFile> logs,
+      final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
+    ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker);
+    tracker.setKeepDeletes(true);
+    try {
+      while (logs.hasNext()) {
+        ProcedureWALFile log = logs.next();
+        log.open();
+        try {
+          reader.read(log, loader);
+        } finally {
+          log.close();
+        }
+      }
+      // The tracker is now updated with all the procedures read from the logs
+      tracker.setPartialFlag(false);
+      tracker.resetUpdates();
+    } finally {
+      tracker.setKeepDeletes(false);
+    }
+    // TODO: Write compacted version?
+    return reader.getProcedures();
+  }
+
+  public static void writeHeader(OutputStream stream, ProcedureWALHeader header)
+      throws IOException {
+    header.writeDelimitedTo(stream);
+  }
+
+  /*
+   * +-----------------+
+   * | END OF WAL DATA | <---+
+   * +-----------------+     |
+   * |                 |     |
+   * |     Tracker     |     |
+   * |                 |     |
+   * +-----------------+     |
+   * |     version     |     |
+   * +-----------------+     |
+   * |  TRAILER_MAGIC  |     |
+   * +-----------------+     |
+   * |      offset     |-----+
+   * +-----------------+
+   */
+  public static void writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
+      throws IOException {
+    long offset = stream.getPos();
+
+    // Write EOF Entry
+    ProcedureWALEntry.newBuilder()
+      .setType(ProcedureWALEntry.Type.EOF)
+      .build().writeDelimitedTo(stream);
+
+    // Write Tracker
+    tracker.writeTo(stream);
+
+    stream.write(TRAILER_VERSION);
+    StreamUtils.writeLong(stream, TRAILER_MAGIC);
+    StreamUtils.writeLong(stream, offset);
+  }
+
+  public static ProcedureWALHeader readHeader(InputStream stream)
+      throws IOException {
+    ProcedureWALHeader header;
+    try {
+      header = ProcedureWALHeader.parseDelimitedFrom(stream);
+    } catch (InvalidProtocolBufferException e) {
+      throw new InvalidWALDataException(e);
+    }
+
+    if (header == null) {
+      throw new InvalidWALDataException("No data available to read the Header");
+    }
+
+    if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) {
+      throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() +
+          " expected " + HEADER_VERSION);
+    }
+
+    if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) {
+      throw new InvalidWALDataException("Invalid header type. got " + header.getType());
+    }
+
+    return header;
+  }
+
+  public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size)
+      throws IOException {
+    long trailerPos = size - 17; // Beginning of the Trailer Jump
+
+    if (trailerPos < startPos) {
+      throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos);
+    }
+
+    stream.seek(trailerPos);
+    int version = stream.read();
+    if (version != TRAILER_VERSION) {
+      throw new InvalidWALDataException("Invalid Trailer version. got " + version +
+          " expected " + TRAILER_VERSION);
+    }
+
+    long magic = StreamUtils.readLong(stream);
+    if (magic != TRAILER_MAGIC) {
+      throw new InvalidWALDataException("Invalid Trailer magic. got " + magic +
+          " expected " + TRAILER_MAGIC);
+    }
+
+    long trailerOffset = StreamUtils.readLong(stream);
+    stream.seek(trailerOffset);
+
+    ProcedureWALEntry entry = readEntry(stream);
+    if (entry.getType() != ProcedureWALEntry.Type.EOF) {
+      throw new InvalidWALDataException("Invalid Trailer begin");
+    }
+
+    ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder()
+      .setVersion(version)
+      .setTrackerPos(stream.getPos())
+      .build();
+    return trailer;
+  }
+
+  public static ProcedureWALEntry readEntry(InputStream stream) throws IOException {
+    return ProcedureWALEntry.parseDelimitedFrom(stream);
+  }
+
+  public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type,
+      Procedure proc, Procedure[] subprocs) throws IOException {
+    ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
+    builder.setType(type);
+    builder.addProcedure(Procedure.convert(proc));
+    if (subprocs != null) {
+      for (int i = 0; i < subprocs.length; ++i) {
+        builder.addProcedure(Procedure.convert(subprocs[i]));
+      }
+    }
+    builder.build().writeDelimitedTo(slot);
+  }
+
+  public static void writeInsert(ByteSlot slot, Procedure proc)
+      throws IOException {
+    writeEntry(slot, ProcedureWALEntry.Type.INIT, proc, null);
+  }
+
+  public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs)
+      throws IOException {
+    writeEntry(slot, ProcedureWALEntry.Type.INSERT, proc, subprocs);
+  }
+
+  public static void writeUpdate(ByteSlot slot, Procedure proc)
+      throws IOException {
+    writeEntry(slot, ProcedureWALEntry.Type.UPDATE, proc, null);
+  }
+
+  public static void writeDelete(ByteSlot slot, long procId)
+      throws IOException {
+    ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
+    builder.setType(ProcedureWALEntry.Type.DELETE);
+    builder.setProcId(procId);
+    builder.build().writeDelimitedTo(slot);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
new file mode 100644
index 0000000..a60b8f5
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
+
+/**
+ * Helper class that loads the procedures stored in a WAL
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ProcedureWALFormatReader {
+  private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class);
+
+  private final ProcedureStoreTracker tracker;
+  //private final long compactionLogId;
+
+  private final Map<Long, Procedure> procedures = new HashMap<Long, Procedure>();
+  private final Map<Long, ProcedureProtos.Procedure> localProcedures =
+    new HashMap<Long, ProcedureProtos.Procedure>();
+
+  private long maxProcId = 0;
+
+  public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
+    this.tracker = tracker;
+  }
+
+  public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException {
+    FSDataInputStream stream = log.getStream();
+    try {
+      boolean hasMore = true;
+      while (hasMore) {
+        ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
+        if (entry == null) {
+          LOG.warn("nothing left to decode. exiting with missing EOF");
+          hasMore = false;
+          break;
+        }
+        switch (entry.getType()) {
+          case INIT:
+            readInitEntry(entry);
+            break;
+          case INSERT:
+            readInsertEntry(entry);
+            break;
+          case UPDATE:
+          case COMPACT:
+            readUpdateEntry(entry);
+            break;
+          case DELETE:
+            readDeleteEntry(entry);
+            break;
+          case EOF:
+            hasMore = false;
+            break;
+          default:
+            throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("got an exception while reading the procedure WAL: " + log, e);
+      loader.markCorruptedWAL(log, e);
+    }
+
+    if (localProcedures.isEmpty()) {
+      LOG.info("No active entry found in state log " + log + ". removing it");
+      loader.removeLog(log);
+    } else {
+      Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd =
+        localProcedures.entrySet().iterator();
+      while (itd.hasNext()) {
+        Map.Entry<Long, ProcedureProtos.Procedure> entry = itd.next();
+        itd.remove();
+
+        // Deserialize the procedure
+        Procedure proc = Procedure.convert(entry.getValue());
+        procedures.put(entry.getKey(), proc);
+      }
+
+      // TODO: Some procedure may be already runnables (see readInitEntry())
+      //       (we can also check the "update map" in the log trackers)
+    }
+  }
+
+  public Iterator<Procedure> getProcedures() {
+    return procedures.values().iterator();
+  }
+
+  private void loadEntries(final ProcedureWALEntry entry) {
+    for (ProcedureProtos.Procedure proc: entry.getProcedureList()) {
+      maxProcId = Math.max(maxProcId, proc.getProcId());
+      if (isRequired(proc.getProcId())) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
+        }
+        localProcedures.put(proc.getProcId(), proc);
+        tracker.setDeleted(proc.getProcId(), false);
+      }
+    }
+  }
+
+  private void readInitEntry(final ProcedureWALEntry entry)
+      throws IOException {
+    assert entry.getProcedureCount() == 1 : "Expected only one procedure";
+    // TODO: Make it runnable, before reading other files
+    loadEntries(entry);
+  }
+
+  private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
+    assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
+    loadEntries(entry);
+  }
+
+  private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
+    assert entry.getProcedureCount() == 1 : "Expected only one procedure";
+    loadEntries(entry);
+  }
+
+  private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
+    assert entry.getProcedureCount() == 0 : "Expected no procedures";
+    assert entry.hasProcId() : "expected ProcID";
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("read delete entry " + entry.getProcId());
+    }
+    maxProcId = Math.max(maxProcId, entry.getProcId());
+    localProcedures.remove(entry.getProcId());
+    tracker.setDeleted(entry.getProcId(), true);
+  }
+
+  private boolean isDeleted(final long procId) {
+    return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
+  }
+
+  private boolean isRequired(final long procId) {
+    return !isDeleted(procId) && !procedures.containsKey(procId);
+  }
+}
\ No newline at end of file


Mime
View raw message