hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject [11/19] hbase git commit: HBASE-12476 HydraBase consensus protocol
Date Tue, 25 Nov 2014 20:29:04 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AggregateTimer.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AggregateTimer.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AggregateTimer.java
new file mode 100644
index 0000000..d10ef7a
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AggregateTimer.java
@@ -0,0 +1,93 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
+
+/**
+ * This is a Timer that can work with multiple ConstituentTimer objects, and
+ * invoke their appropriate callbacks when required.
+ *
+ * The main benefit over RepeatingTimer is to not have to use a separate thread
+ * for each timer.
+ */
+public class AggregateTimer {
+  public static final Log LOG = LogFactory.getLog(AggregateTimer.class);
+  private ScheduledExecutorService executor;
+  Set<ConstituentTimer> timers;
+
+  public class TimerEvent implements Runnable {
+    final ConstituentTimer timer;
+    private boolean cancelled = false;
+
+    public TimerEvent(ConstituentTimer timer) {
+      this.timer = timer;
+    }
+
+    public synchronized void cancel() {
+      cancelled = true;
+    }
+
+    @Override
+    public synchronized void run() {
+      try {
+        if (cancelled || timer.isStopped()) {
+          return;
+        }
+
+        timer.onTimeOut();
+        if (!timer.isStopped()) {
+          schedule(this);
+        }
+      } catch (Exception e) {
+        LOG.error("Timer caught an unknown exception ", e);
+        throw e;
+      }
+    }
+  }
+
+  public AggregateTimer() {
+    this.timers = new ConcurrentSkipListSet<>();
+    this.executor = Executors.newSingleThreadScheduledExecutor(
+      new DaemonThreadFactory("aggregate-timer"));
+  }
+
+  public ConstituentTimer createTimer(
+    String timerName, final long delay, final TimeUnit unit,
+    final TimeoutEventHandler callback) {
+    ConstituentTimer timer =
+      new ConstituentTimer(this, timerName, delay, unit, callback);
+    submitNewTimerEvent(timer);
+
+    return timer;
+  }
+
+  public TimerEvent submitNewTimerEvent(final ConstituentTimer timer) {
+    if (!timer.isStopped()) {
+      TimerEvent event = new TimerEvent(timer);
+      schedule(event);
+      return event;
+    }
+    return null;
+  }
+
+  void schedule(TimerEvent event) {
+    executor.schedule(event,
+      event.timer.getDelayMillis() + event.timer.getBackOffInterval(),
+      TimeUnit.MILLISECONDS);
+  }
+
+  public void shutdown() {
+    executor.shutdown();
+  }
+
+  public void shutdownNow() {
+    executor.shutdownNow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AppendConsensusSession.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AppendConsensusSession.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AppendConsensusSession.java
new file mode 100644
index 0000000..ec46247
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AppendConsensusSession.java
@@ -0,0 +1,247 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.apache.hadoop.hbase.consensus.metrics.ConsensusMetrics;
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+import org.apache.hadoop.hbase.consensus.raft.events.ReplicateEntriesEvent;
+import org.apache.hadoop.hbase.consensus.rpc.AppendRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents a session of an in-progress AppendRequest, and holds
+ * information such as if the session is complete, majority acked, or it failed
+ * for some reason.
+ */
+public class AppendConsensusSession implements AppendConsensusSessionInterface {
+  private static Logger LOG = LoggerFactory.getLogger(AppendConsensusSession.class);
+  private final int majorityCount;
+  private final EditId id;
+  private final AppendRequest request;
+  private final ReplicateEntriesEvent replicateEntriesEvent;
+
+  /** The key of ackMap is the peer host name and value is the time elapsed for the ack */
+  private final Map<String, Long> ackMap = new HashMap<>();
+  private final Set<String> lagSet = new HashSet<>();
+  private final Set<String> highTermSet = new HashSet<>();
+  private final Set<String> peers;
+  private final ConsensusMetrics metrics;
+  private long sessionStartTime;
+
+  private final int currentRank;
+
+  /** This boolean flag indicates whether there is any higher rank peer caught up the transacations
+   * and potentially ready for taking over the leadership. */
+  private boolean isHigherRankPeerCaughtup = false;
+
+  private final boolean enableStepDownOnHigherRankCaughtUp;
+
+  private static final long WRITE_OUTLIERS_DEFAULT_MS = 5; // 5 ms
+
+  private SessionResult currentResult = SessionResult.NOT_COMPLETED;
+
+  private static final int APPEND_RETRY_MAX = 5;
+
+  private int tries = 0;
+  private final int maxTries;
+
+  ImmutableRaftContext c;
+
+  /**
+   * To construct a AppendConsensusSession.
+   * @param majorityCount The majority number of the quorum size.
+   * @param request The append request.
+   * @param event   The ReplicatedEntriesEvent for this corresponding appendRequest.
+   * @param rank The rank of the current peer
+   * @param enableStepDownOnHigherRankCaughtUp Whether to step down voluntarily if the higher rank peer
+   *                                       has caught up.
+   */
+  public AppendConsensusSession(ImmutableRaftContext c,
+                                int majorityCount,
+                                final AppendRequest request,
+                                final ReplicateEntriesEvent event,
+                                final ConsensusMetrics metrics,
+                                final int rank,
+                                final boolean enableStepDownOnHigherRankCaughtUp,
+                                final int maxTries,
+                                final Set<String> peers) {
+    this.c = c;
+    this.majorityCount = majorityCount;
+    this.request = request;
+    assert request.validateFields();
+    assert request.logCount() == 1;
+    this.id = request.getLogId(0).clone(); //??
+    this.replicateEntriesEvent = event;
+    this.metrics = metrics;
+    this.currentRank = rank;
+    this.enableStepDownOnHigherRankCaughtUp = enableStepDownOnHigherRankCaughtUp;
+    this.peers = peers;
+    this.maxTries = maxTries;
+    this.sessionStartTime = System.nanoTime();
+    resetInvariants();
+  }
+
+  @Override
+  public void reset() {
+    resetInvariants();
+
+    if (++tries >= APPEND_RETRY_MAX) {
+      this.request.enableTraceable();
+    }
+  }
+
+  private void resetInvariants() {
+    ackMap.clear();
+    lagSet.clear();
+    highTermSet.clear();
+    isHigherRankPeerCaughtup = false;
+    currentResult = SessionResult.NOT_COMPLETED;
+  }
+
+  @Override
+  public boolean isComplete() {
+    return getResult().equals(SessionResult.NOT_COMPLETED) ? false : true;
+  }
+
+  private void traceLogAppendOutliers() {
+    long elapsed = System.nanoTime() - sessionStartTime;
+    if (elapsed > (WRITE_OUTLIERS_DEFAULT_MS * 1000)) {
+      StringBuffer sb = new StringBuffer();
+      sb.append("AppendConsensusSession outlier: " + request.toString() +
+        " took " + elapsed + " ns; [");
+      for (Map.Entry<String, Long> entry : ackMap.entrySet()) {
+        sb.append(entry.getKey() + " -> " + entry.getValue() + " ; ");
+      }
+      sb.append("]");
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(sb.toString());
+      }
+    }
+  }
+
+  private void generateResult() {
+    if (!currentResult.equals(SessionResult.NOT_COMPLETED)) {
+      return;
+    }
+
+    // Check if the majority has been reached and the I (leader) have ACK-ed too.
+    // We do this because we cannot serve reads until the leader has ACK-ed.
+    // If I am not part of the peers, my vote doesn't matter.
+    if (ackMap.size() >= majorityCount
+      && (!peers.contains(c.getMyAddress()) ||
+      ackMap.containsKey(c.getMyAddress()))) {
+      long elapsed = System.nanoTime() - sessionStartTime;
+      metrics.getAppendEntriesLatency().add(elapsed, TimeUnit.NANOSECONDS);
+
+      if (isStepDownOnHigherRankCaughtUpEnabled() && isHigherRankPeerCaughtup) {
+        // When there is at least one higher rank peer caught up all the transactions,
+        // the current leader needs to step down voluntarily.
+        metrics.incHigherRankCaughtUpStepDown();
+        currentResult = SessionResult.STEP_DOWN;
+        LOG.debug("Going to step down voluntarily from leadership");
+      } else {
+        // Otherwise, return majority acked.
+        if (LOG.isTraceEnabled()) {
+          traceLogAppendOutliers();
+        }
+        currentResult = SessionResult.MAJORITY_ACKED;
+      }
+
+    } else if (highTermSet.size() >= majorityCount) {
+      metrics.incAppendEntriesStepDown();
+      currentResult =  SessionResult.STEP_DOWN;
+    } else if (lagSet.size() + highTermSet.size() >= majorityCount) {
+      metrics.incAppendEntriesRetries();
+      currentResult = SessionResult.RETRY;
+    }
+  }
+
+  private boolean isStepDownOnHigherRankCaughtUpEnabled() {
+    return enableStepDownOnHigherRankCaughtUp;
+  }
+
+  @Override
+  public SessionResult getResult() {
+    generateResult();
+    if (request.isTraceable()) {
+      LOG.debug(String.format("[AppendRequest Trace] %s and current result is %s",
+        request.toString(), currentResult.toString()));
+    }
+    return currentResult;
+  }
+
+  @Override
+  public EditId getSessionId() {
+    return id;
+  }
+
+  @Override
+  public void cancel() {
+    this.metrics.incHeartBeatCanceled();
+    this.currentResult = SessionResult.CANCELED;
+  }
+
+  @Override
+  public boolean isTimeout() {
+    boolean timedout = tries >= maxTries;
+    if (timedout) {
+      LOG.info(String.format("Append Request (%s) timed out", this.request.toString()));
+    }
+    return timedout;
+  }
+
+  @Override
+  public AppendRequest getAppendRequest() {
+    return request;
+  }
+
+  @Override
+  public ReplicateEntriesEvent getReplicateEntriesEvent() {
+    return replicateEntriesEvent;
+  }
+
+  @Override
+  public void incrementAck(final EditId id, final String address, final int rank,
+                           boolean canTakeover) {
+    assert this.id.equals(id);
+    if (peers.contains(address)) {
+      ackMap.put(address, System.nanoTime() - sessionStartTime);
+      if (rank > currentRank && canTakeover) {
+        isHigherRankPeerCaughtup = true;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Append Request (%s) received a higher" +
+            "rank ack from %s", request.toString(), address));
+        }
+      }
+    }
+  }
+
+  @Override
+  public void incrementHighTermCnt(final EditId id, final String address) {
+    assert this.id.equals(id);
+    if (peers.contains(address)) {
+      highTermSet.add(address);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Append Request (%s) received a higher term" +
+            "response from %s", request.toString(), address));
+      }
+    }
+  }
+
+  @Override
+  public void incrementLagCnt(final EditId id, final String address) {
+    assert this.id.equals(id);
+    if (peers.contains(address)) {
+      lagSet.add(address);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Append Request (%s) received a nak response " +
+          "from %s", request.toString(), address));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AppendConsensusSessionInterface.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AppendConsensusSessionInterface.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AppendConsensusSessionInterface.java
new file mode 100644
index 0000000..44c483b
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AppendConsensusSessionInterface.java
@@ -0,0 +1,17 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+import org.apache.hadoop.hbase.consensus.raft.events.ReplicateEntriesEvent;
+import org.apache.hadoop.hbase.consensus.rpc.AppendRequest;
+
+public interface AppendConsensusSessionInterface extends ConsensusSession {
+  AppendRequest getAppendRequest();
+  boolean isTimeout();
+  ReplicateEntriesEvent getReplicateEntriesEvent();
+  void incrementAck(final EditId id, final String address, final int rank,
+                    boolean canTakeover);
+  void incrementHighTermCnt(final EditId id, final String address);
+  void incrementLagCnt(final EditId id, final String address);
+  void reset();
+  void cancel();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AppendResponseCallBack.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AppendResponseCallBack.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AppendResponseCallBack.java
new file mode 100644
index 0000000..12b9e79
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AppendResponseCallBack.java
@@ -0,0 +1,26 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.FutureCallback;
+import org.apache.hadoop.hbase.consensus.rpc.AppendResponse;
+
+public class AppendResponseCallBack implements FutureCallback<AppendResponse>{
+  private static Logger LOG = LoggerFactory.getLogger(AppendResponseCallBack.class);
+  private ConsensusSession session;
+
+  public AppendResponseCallBack(ConsensusSession session) {
+    this.session = session;
+  }
+
+  public void onSuccess(AppendResponse response) {
+
+  }
+
+  @Override
+  public void onFailure(Throwable arg0) {
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ConsensusSession.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ConsensusSession.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ConsensusSession.java
new file mode 100644
index 0000000..448dcf8
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ConsensusSession.java
@@ -0,0 +1,9 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+
+public interface ConsensusSession {
+  boolean isComplete();
+  SessionResult getResult();
+  EditId getSessionId();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ConstituentTimer.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ConstituentTimer.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ConstituentTimer.java
new file mode 100644
index 0000000..8b11a32
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ConstituentTimer.java
@@ -0,0 +1,109 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is a timer which is a part of an AggregateTimer, which holds several
+ * such timers, and invokes the callbacks on the child timers as required.
+ *
+ * The timer retains the behavior of the regular timer, and the user need not
+ * know that this timer doesn't own an exclusive thread.
+ */
+public class ConstituentTimer implements Timer {
+  public static final Log LOG = LogFactory.getLog(ConstituentTimer.class);
+  private volatile long delayMillis;
+  private volatile long backOffInterval;
+
+  private volatile boolean isStopped = true;
+
+  AggregateTimer aggregateTimer;
+  TimeoutEventHandler callback;
+  AggregateTimer.TimerEvent timerEvent;
+
+  String timerName;
+
+  /**
+   * @param aggregateTimer The AggregateTimer object to use.
+   * @param delay Delay between the timeouts
+   * @param timeUnit The time unit of the delay
+   * @param callback The callback to register
+   */
+  public ConstituentTimer(AggregateTimer aggregateTimer,
+                          String timerName,
+                          long delay,
+                          TimeUnit timeUnit,
+                          TimeoutEventHandler callback) {
+    this.aggregateTimer = aggregateTimer;
+    this.callback = callback;
+    this.delayMillis = TimeUnit.MILLISECONDS.convert(delay, timeUnit);
+    this.backOffInterval = 0;
+    this.timerName = timerName;
+  }
+
+  @Override
+  public synchronized void start() {
+    if (isStopped) {
+      isStopped = false;
+      timerEvent = aggregateTimer.submitNewTimerEvent(this);
+      backOffInterval = 0;
+    }
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (!isStopped) {
+      isStopped = true;
+      timerEvent.cancel();
+      timerEvent = null;
+    }
+  }
+
+  @Override
+  public synchronized void reset() {
+    if (!isStopped) {
+      // Reset happens by proactively removing and inserting the timer event
+      // again.
+      timerEvent.cancel();
+      timerEvent = aggregateTimer.submitNewTimerEvent(this);
+      backOffInterval = 0;
+    }
+  }
+
+  @Override
+  public synchronized void shutdown() {
+    stop();
+  }
+
+  @Override
+  public synchronized void backoff(long backOffTime, TimeUnit units) {
+    backOffInterval = TimeUnit.MILLISECONDS.convert(backOffTime, units);
+  }
+
+  @Override
+  public synchronized void setDelay(long delay, TimeUnit unit) {
+    delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
+  }
+
+  public void onTimeOut() {
+    callback.onTimeout();
+  }
+
+  public long getDelayMillis() {
+    return delayMillis;
+  }
+
+  public long getBackOffInterval() {
+    return backOffInterval;
+  }
+
+  public boolean isStopped() {
+    return isStopped;
+  }
+
+  public String getTimerName() {
+    return timerName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/HeartbeatTimeoutCallback.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/HeartbeatTimeoutCallback.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/HeartbeatTimeoutCallback.java
new file mode 100644
index 0000000..77f8718
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/HeartbeatTimeoutCallback.java
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.consensus.raft.events.ReplicateEntriesEvent;
+import java.nio.ByteBuffer;
+
+public class
+  HeartbeatTimeoutCallback implements TimeoutEventHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(
+          HeartbeatTimeoutCallback.class);
+  private final RaftQuorumContext replica;
+  private static final ReplicateEntriesEvent HEARTBEAT_EVENT = new ReplicateEntriesEvent(true,
+      ByteBuffer.allocate(1));
+
+  public HeartbeatTimeoutCallback(final RaftQuorumContext replica) {
+    this.replica = replica;
+  }
+
+  @Override
+  public void onTimeout() {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("HeartBeat Triggered on " + replica);
+    }
+
+    // When there is no append request for a long time, in order to avoid
+    // progress timeouts, we offer heartbeats which are no-ops.
+    replica.getConsensusMetrics().incHeartBeatTimeouts();
+    replica.offerEvent(HEARTBEAT_EVENT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ImmutableRaftContext.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ImmutableRaftContext.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ImmutableRaftContext.java
new file mode 100644
index 0000000..fdfa4bd
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ImmutableRaftContext.java
@@ -0,0 +1,147 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.consensus.log.CommitLogManagerInterface;
+import org.apache.hadoop.hbase.consensus.metrics.ConsensusMetrics;
+import org.apache.hadoop.hbase.consensus.protocol.ConsensusHost;
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+import org.apache.hadoop.hbase.consensus.rpc.PeerStatus;
+import org.apache.hadoop.hbase.regionserver.RaftEventListener;
+
+import java.util.Map;
+
+/**
+ * Declares a set of immutable methods which can be used to make decisions
+ * during various events in the state machine.
+ */
+public interface ImmutableRaftContext {
+  /**
+   * Tells whether it is currently leader or not.
+   * @return
+   */
+  boolean isLeader();
+
+  /**
+   * Tells whether it is currently a candidate or not.
+   * @return
+   */
+  boolean isCandidate();
+
+  /**
+   * Tells whether it is currently a follower or not.
+   * @return
+   */
+  boolean isFollower();
+
+  /**
+   * Returns the current {term, index} for the state.
+   * @return
+   */
+  EditId getCurrentEdit();
+
+  /**
+   * Returns the last committed {term, index} for the state.
+   * @return
+   */
+  EditId getCommittedEdit();
+
+  /**
+   * Returns the last round's {term, index}.
+   * @return
+   */
+  EditId getPreviousEdit();
+
+  /**
+   * Get the current leader's information.
+   * @return
+   */
+  ConsensusHost getLeader();
+
+  /**
+   * Returns the ID for the current server
+   * @return
+   */
+  String getMyAddress();
+
+  /**
+   * Returns the majority cnt for the current quorum, including the current server
+   * @return
+   */
+  int getMajorityCnt();
+
+  /**
+   * Get the id of the last peer we voted for.
+   * @return
+   */
+  ConsensusHost getLastVotedFor();
+
+  /**
+   * Return the outstanding append session.
+   * @return
+   */
+  AppendConsensusSessionInterface getOutstandingAppendSession();
+
+  /**
+   * Return the outstanding append session if it matches the given edit.
+   * @param id
+   * @return
+   */
+  AppendConsensusSessionInterface getAppendSession(final EditId id);
+
+  /**
+   * Return the outstanding election session.
+   * @return
+   */
+  VoteConsensusSessionInterface getOutstandingElectionSession();
+
+  /**
+   * Return the outstanding election session if it matches the given edit.
+   * @param id
+   * @return
+   */
+  VoteConsensusSessionInterface getElectionSession(final EditId id);
+
+  /**
+   * Is transaction log accessible
+   */
+  boolean isLogAccessible();
+
+  int getRanking();
+
+  boolean validateLogEntry(final EditId id);
+
+  String getQuorumName();
+
+  Configuration getConf();
+
+  EditId getLastLogIndex();
+
+  void stop(boolean wait);
+
+  CommitLogManagerInterface getLogManager();
+
+  QuorumInfo getQuorumInfo();
+
+  RaftEventListener getDataStoreEventListener();
+
+  long getMinUnPersistedIndexAcrossQuorum();
+
+  ConsensusMetrics getConsensusMetrics();
+
+  Map<HServerAddress,Integer> getNewConfiguration();
+
+  QuorumMembershipChangeRequest getUpdateMembershipRequest();
+
+  long getPurgeIndex();
+
+  PeerStatus getStatus();
+
+  int getAppendEntriesMaxTries();
+
+  long getLastAppendRequestReceivedTime();
+
+  int getNumPendingEvents();
+
+  boolean isPartOfNewQuorum();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointAppendConsensusSession.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointAppendConsensusSession.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointAppendConsensusSession.java
new file mode 100644
index 0000000..23c77e0
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointAppendConsensusSession.java
@@ -0,0 +1,97 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.apache.hadoop.hbase.consensus.metrics.ConsensusMetrics;
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+import org.apache.hadoop.hbase.consensus.raft.events.ReplicateEntriesEvent;
+import org.apache.hadoop.hbase.consensus.rpc.AppendRequest;
+
+import java.util.Set;
+
+public class JointAppendConsensusSession implements AppendConsensusSessionInterface {
+
+  private AppendConsensusSession oldConfigSession;
+  private AppendConsensusSession newConfigSession;
+
+  public JointAppendConsensusSession(ImmutableRaftContext c,
+                                int majorityCount,
+                                final AppendRequest request,
+                                final ReplicateEntriesEvent event,
+                                final ConsensusMetrics metrics,
+                                final int rank,
+                                final boolean enableStepDownOnHigherRankCaughtUp,
+                                final int maxTries,
+                                final Set<String> oldPeers,
+                                final Set<String> newPeers) {
+    oldConfigSession = new AppendConsensusSession(c, majorityCount, request,
+      event, metrics, rank, enableStepDownOnHigherRankCaughtUp,
+            maxTries, oldPeers);
+    newConfigSession = new AppendConsensusSession(c, majorityCount, request,
+      event, metrics, rank, enableStepDownOnHigherRankCaughtUp,
+            maxTries, newPeers);
+  }
+
+  @Override
+  public boolean isComplete() {
+    return oldConfigSession.isComplete() && newConfigSession.isComplete();
+  }
+
+  @Override
+  public SessionResult getResult() {
+    if (oldConfigSession.getResult() != newConfigSession.getResult()) {
+      return SessionResult.NOT_COMPLETED;
+    }
+
+    return newConfigSession.getResult();
+  }
+
+  @Override
+  public EditId getSessionId() {
+    return oldConfigSession.getSessionId();
+  }
+
+  @Override
+  public void incrementAck(final EditId id, final String address, final int rank,
+                           boolean canTakeover) {
+    oldConfigSession.incrementAck(id, address, rank, canTakeover);
+    newConfigSession.incrementAck(id, address, rank, canTakeover);
+  }
+
+  @Override
+  public void incrementHighTermCnt(final EditId id, final String address) {
+    oldConfigSession.incrementHighTermCnt(id, address);
+    newConfigSession.incrementHighTermCnt(id, address);
+  }
+
+  @Override
+  public void incrementLagCnt(final EditId id, final String address) {
+    oldConfigSession.incrementLagCnt(id, address);
+    newConfigSession.incrementLagCnt(id, address);
+  }
+
+  @Override
+  public ReplicateEntriesEvent getReplicateEntriesEvent() {
+    return oldConfigSession.getReplicateEntriesEvent();
+  }
+
+  @Override
+  public void reset() {
+    oldConfigSession.reset();
+    newConfigSession.reset();
+  }
+
+  @Override
+  public AppendRequest getAppendRequest() {
+    return oldConfigSession.getAppendRequest();
+  }
+
+  @Override
+  public void cancel() {
+    oldConfigSession.cancel();
+    newConfigSession.cancel();
+  }
+
+  @Override
+  public boolean isTimeout() {
+    return oldConfigSession.isTimeout() || newConfigSession.isTimeout();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointConsensusPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointConsensusPeerManager.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointConsensusPeerManager.java
new file mode 100644
index 0000000..a8830fa
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointConsensusPeerManager.java
@@ -0,0 +1,179 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.consensus.util.RaftUtil;
+import org.apache.hadoop.hbase.consensus.fsm.Event;
+import org.apache.hadoop.hbase.consensus.metrics.ConsensusMetrics;
+import org.apache.hadoop.hbase.consensus.raft.events.ReplicateEntriesEvent;
+import org.apache.hadoop.hbase.consensus.rpc.AppendRequest;
+import org.apache.hadoop.hbase.consensus.rpc.VoteRequest;
+import org.apache.hadoop.hbase.consensus.server.peer.PeerServer;
+import org.apache.hadoop.hbase.consensus.server.peer.events.PeerServerEventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Class to handle peers, leader elections, transactions while a quorum change is
+ * in progress. Particularly, it has the knowledge about both the old and the new
+ * config and makes sure, that all the decisions are made in agreement with both
+ * the configs.
+ *
+ * TODO: Get rid of oldPeerServers/newPeerServers notion.
+ */
+public class JointConsensusPeerManager extends AbstractPeerManager {
+  private static final Logger LOG = LoggerFactory.getLogger(
+    JointConsensusPeerManager.class);
+
+  // Peers in the old config
+  private Map<String, PeerServer> oldPeerServers;
+
+  // Peers in the new config
+  private Map<String, PeerServer> newPeerServers;
+
+  // Contains all the peers involved in both the configs
+  private Map<String, PeerServer> allPeers;
+
+  private final QuorumInfo newConfig;
+
+  public JointConsensusPeerManager(final MutableRaftContext c,
+                                   final QuorumInfo newConfig) {
+    super(c);
+    this.newConfig = newConfig;
+    allPeers = new HashMap<>();
+  }
+
+  @Override
+  public void initializePeers() {
+
+    // Initialize the old peers
+    if (oldPeerServers == null) {
+      oldPeerServers = super.initializePeers(c.getQuorumInfo().getPeersWithRank());
+    }
+    allPeers.putAll(oldPeerServers);
+
+    // Initialize the new peers
+    if (newPeerServers == null) {
+      newPeerServers = new HashMap<>();
+      Map<HServerAddress, Integer> newPeers = new HashMap<>();
+
+      // There can be an overlap between the new and old configuration. Hence,
+      // we should initialize a peer only once. So, lets remove the peers which
+      // are already initialized.
+      newPeers.putAll(newConfig.getPeersWithRank());
+      Iterator<Map.Entry<HServerAddress, Integer>> newPeerIterator =
+        newPeers.entrySet().iterator();
+      HServerAddress peerAddress;
+      while (newPeerIterator.hasNext()) {
+        Map.Entry<HServerAddress, Integer> e = newPeerIterator.next();
+        peerAddress = e.getKey();
+        int newPeerRank = e.getValue();
+        String consensusServerAddress =
+          RaftUtil.getLocalConsensusAddress(peerAddress).getHostAddressWithPort();
+        if (oldPeerServers.get(consensusServerAddress) != null) {
+          PeerServer oldPeerServer = oldPeerServers.get(consensusServerAddress);
+          oldPeerServer.setRank(newPeerRank);
+          newPeerServers.put(consensusServerAddress, oldPeerServer);
+          oldPeerServers.remove(consensusServerAddress);
+          newPeerIterator.remove();
+        }
+      }
+
+      // Initialize the remaining peers
+      final Map<String, PeerServer> newServers = super.initializePeers(newPeers);
+      newPeerServers.putAll(newServers);
+      allPeers.putAll(newServers);
+    }
+  }
+
+  @Override
+  public void setPeerServers(Map<String, PeerServer> peers) {
+    oldPeerServers = peers;
+    newPeerServers = peers;
+  }
+
+  public void setOldPeerServers(Map<String, PeerServer> peers) {
+    oldPeerServers = peers;
+  }
+
+  public Map<String, PeerServer> getNewPeerServers() {
+    return newPeerServers;
+  }
+
+  @Override
+  public Map<String, PeerServer> getPeerServers() {
+    return allPeers;
+  }
+
+  @Override
+  public void resetPeers() {
+    super.resetPeers(allPeers);
+  }
+
+  @Override
+  public void setPeerReachable(String address) {
+
+    PeerServer server = null;
+    if ((server = allPeers.get(address)) != null) {
+      server.enqueueEvent(new Event(PeerServerEventType.PEER_REACHABLE));
+    }
+  }
+
+  @Override
+  public void sendVoteRequestToQuorum(VoteRequest request) {
+    super.broadcastVoteRequest(allPeers, request);
+  }
+
+  @Override
+  public void sendAppendRequestToQuorum(AppendRequest request) {
+    LOG.info("Sending an appendRequest to quorum " + c.getQuorumName() +
+      " via the JointConsensusPeerManager ");
+    super.broadcastAppendRequest(allPeers, request);
+  }
+
+  @Override
+  public void stop() {
+    super.stop(allPeers);
+  }
+
+  public void stopOldPeers() {
+    for (String peer : oldPeerServers.keySet()) {
+      if (newPeerServers.get(peer) == null) {
+        oldPeerServers.get(peer).stop();
+      }
+    }
+  }
+
+  @Override
+  public String getState() {
+   return super.getState(allPeers);
+  }
+
+  @Override
+  public AppendConsensusSessionInterface createAppendConsensusSession(
+          int majorityCount, AppendRequest request, ReplicateEntriesEvent event,
+          ConsensusMetrics metrics, int rank,
+          boolean enableStepDownOnHigherRankCaughtUp) {
+    return new JointAppendConsensusSession(c, majorityCount, request, event,
+      metrics, rank, enableStepDownOnHigherRankCaughtUp,
+      c.getAppendEntriesMaxTries(), c.getQuorumInfo().getPeersAsString(),
+      newConfig.getPeersAsString());
+  }
+
+  @Override
+  public VoteConsensusSessionInterface createVoteConsensusSession(
+    int majorityCount, VoteRequest request, ConsensusMetrics metrics) {
+    return new JointVoteConsensusSession(majorityCount, request, metrics, c.getQuorumInfo().getPeersAsString(),
+      newConfig.getPeersAsString());
+  }
+
+  public List<QuorumInfo> getConfigs() {
+    return Arrays.asList(c.getQuorumInfo(), newConfig);
+  }
+
+  @Override
+  public boolean isInJointQuorumMode() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointVoteConsensusSession.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointVoteConsensusSession.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointVoteConsensusSession.java
new file mode 100644
index 0000000..fb3abe3
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointVoteConsensusSession.java
@@ -0,0 +1,56 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.apache.hadoop.hbase.consensus.metrics.ConsensusMetrics;
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+import org.apache.hadoop.hbase.consensus.rpc.VoteRequest;
+
+import java.util.Set;
+
+public class JointVoteConsensusSession implements VoteConsensusSessionInterface {
+  VoteConsensusSession oldConfigSession;
+  VoteConsensusSession newConfigSession;
+
+  public JointVoteConsensusSession(int majorityCount, final VoteRequest request,
+                                   final ConsensusMetrics metrics,
+                                   final Set<String> oldPeers,
+                                   final Set<String> newPeers) {
+    oldConfigSession = new VoteConsensusSession(majorityCount, request, metrics, oldPeers);
+    newConfigSession = new VoteConsensusSession(majorityCount, request, metrics, newPeers);
+  }
+
+
+  @Override public void incrementAck(EditId id, String address) {
+    oldConfigSession.incrementAck(id, address);
+    newConfigSession.incrementAck(id, address);
+  }
+
+  @Override public void incrementNack(EditId id, String address) {
+    oldConfigSession.incrementNack(id, address);
+    newConfigSession.incrementNack(id, address);
+  }
+
+  @Override public void setVoteSessionFailed(EditId id) {
+    oldConfigSession.setVoteSessionFailed(id);
+    newConfigSession.setVoteSessionFailed(id);
+  }
+
+  @Override public VoteRequest getRequest() {
+    return oldConfigSession.getRequest();
+  }
+
+  @Override public boolean isComplete() {
+    return oldConfigSession.isComplete() && newConfigSession.isComplete();
+  }
+
+  @Override public SessionResult getResult() {
+    if (oldConfigSession.getResult() != newConfigSession.getResult()) {
+      return SessionResult.NOT_COMPLETED;
+    }
+
+    return oldConfigSession.getResult();
+  }
+
+  @Override public EditId getSessionId() {
+    return oldConfigSession.getSessionId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/MutableRaftContext.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/MutableRaftContext.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/MutableRaftContext.java
new file mode 100644
index 0000000..9802d21
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/MutableRaftContext.java
@@ -0,0 +1,125 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.consensus.fsm.Event;
+import org.apache.hadoop.hbase.consensus.protocol.ConsensusHost;
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+import org.apache.hadoop.hbase.consensus.raft.events.ReplicateEntriesEvent;
+import org.apache.hadoop.hbase.consensus.rpc.VoteRequest;
+import org.apache.hadoop.hbase.util.Arena;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Set of methods used by the states in the raft protocol to update the state
+ * machine on various events;
+ */
+public interface MutableRaftContext extends ImmutableRaftContext {
+  /**
+   * Updates the current edit
+   * @param id
+   */
+  void setCurrentEditId(final EditId id);
+
+  /**
+   * Updates the last committed edit id.
+   * @param id
+   */
+  void advanceCommittedIndex(final EditId id);
+
+  /**
+   * Set the leader id.
+   * @param hostId
+   */
+  void setLeader(final ConsensusHost hostId);
+
+  /**
+   * Updates the last committed edit id.
+   * @param id
+   */
+  void setPreviousEditId(final EditId id);
+
+  /**
+   * Set the last voted for id.
+   * @param hostId
+   */
+  ListenableFuture<Void> setVotedFor(final ConsensusHost hostId);
+
+  /**
+   * clear the leader id.
+   */
+  void clearLeader();
+
+  /**
+   * Clear the last voted for id.
+   */
+  void clearVotedFor();
+
+  void appendToLog(final EditId currLogId, final long commitIndex,
+                   final ByteBuffer data);
+
+  void setElectionSession(VoteConsensusSessionInterface session);
+
+  void setAppendSession(AppendConsensusSessionInterface session);
+
+  void sendVoteRequestToQuorum(VoteRequest request);
+
+  void truncateLogEntries(final EditId lastValidEntryId) throws IOException;
+  boolean offerEvent(final Event e);
+
+  Timer getHeartbeatTimer();
+
+  Timer getProgressTimer();
+
+  ListenableFuture<?> sendAppendRequest(ReplicateEntriesEvent event);
+
+  void setLastAppendRequestReceivedTime(long timeMillis);
+
+  ListenableFuture<?> sendEmptyAppendRequest();
+
+  void leaderStepDown();
+
+  void candidateStepDown();
+
+  void resendOutstandingAppendRequest();
+
+  void resetPeers();
+
+  void setPeerReachable(String address);
+
+  String getLeaderNotReadyMsg();
+
+  void updatePeerAckedId(String address, EditId remoteEdit);
+
+  void setMinAckedIndexAcrossAllPeers(long index);
+
+  void setUpdateMembershipRequest(
+    QuorumMembershipChangeRequest request);
+
+  PeerManagerInterface getPeerManager();
+
+  HServerAddress getServerAddress();
+
+  void updateToJointQuorumMembership(final QuorumInfo config) throws IOException;
+
+  void updateToNewQuorumMembership(final QuorumInfo config)
+    throws IOException;
+
+  void handleQuorumChangeRequest(final ByteBuffer buffer) throws IOException;
+
+  Arena getArena();
+
+  void reseedStartIndex(long index) throws IOException;
+
+  void setQuorumInfo(final QuorumInfo update);
+
+  void cleanUpJointStates();
+
+  boolean canTakeOver();
+
+  ExecutorService getExecServiceForThriftClients();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/PeerManagerInterface.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/PeerManagerInterface.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/PeerManagerInterface.java
new file mode 100644
index 0000000..536a4ae
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/PeerManagerInterface.java
@@ -0,0 +1,39 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.apache.hadoop.hbase.consensus.metrics.ConsensusMetrics;
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+import org.apache.hadoop.hbase.consensus.raft.events.ReplicateEntriesEvent;
+import org.apache.hadoop.hbase.consensus.rpc.AppendRequest;
+import org.apache.hadoop.hbase.consensus.rpc.VoteRequest;
+import org.apache.hadoop.hbase.consensus.server.peer.PeerServer;
+
+import java.util.List;
+import java.util.Map;
+
+public interface PeerManagerInterface {
+  void initializePeers();
+  void setPeerServers(final Map<String, PeerServer> peers);
+  Map<String, PeerServer> getPeerServers();
+  void resetPeers();
+  void setPeerReachable(String address);
+  void sendVoteRequestToQuorum(VoteRequest request);
+  void sendAppendRequestToQuorum(AppendRequest request);
+  void updatePeerAckedId(String address, EditId remoteEdit);
+  long getMinUnPersistedIndexAcrossQuorum();
+  void setMinAckedIndexAcrossAllPeers(long index);
+  void stop();
+  String getState();
+  AppendConsensusSessionInterface createAppendConsensusSession(int majorityCount,
+                                                     final AppendRequest request,
+                                                     final ReplicateEntriesEvent event,
+                                                     final ConsensusMetrics metrics,
+                                                     final int rank,
+                                                     final boolean enableStepDownOnHigherRankCaughtUp);
+  VoteConsensusSessionInterface createVoteConsensusSession(
+    int majorityCount,
+    final VoteRequest request,
+    final ConsensusMetrics metrics);
+
+  List<QuorumInfo> getConfigs();
+  boolean isInJointQuorumMode();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ProgressTimeoutCallback.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ProgressTimeoutCallback.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ProgressTimeoutCallback.java
new file mode 100644
index 0000000..9cabd76
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ProgressTimeoutCallback.java
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.apache.hadoop.hbase.consensus.raft.events.ProgressTimeoutEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+
+public class ProgressTimeoutCallback implements TimeoutEventHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(
+    ProgressTimeoutCallback.class);
+  private final RaftQuorumContext replica;
+
+  public ProgressTimeoutCallback(final RaftQuorumContext replica) {
+    this.replica = replica;
+  }
+
+  public void onTimeout() {
+    LOG.info(replica + " has a progress timeout! " +
+      " current edit: " +
+      replica.getCurrentEdit() + ", Last AppendRequest was received at : " +
+      new Date(replica.getLastAppendRequestReceivedTime()));
+
+    if (System.currentTimeMillis() - replica.getLastAppendRequestReceivedTime() >=
+      replica.getProgressTimeoutForMeMillis()) {
+      replica.getConsensusMetrics().incProgressTimeouts();
+      replica.offerEvent(new ProgressTimeoutEvent());
+    } else {
+      LOG.info(replica + " Ignoring the progress timer.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumAgent.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumAgent.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumAgent.java
new file mode 100644
index 0000000..8120dd2
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumAgent.java
@@ -0,0 +1,463 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.NoLeaderForRegionException;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
+import org.apache.hadoop.hbase.consensus.exceptions.CommitQueueOverloadedException;
+import org.apache.hadoop.hbase.consensus.exceptions.NewLeaderException;
+import org.apache.hadoop.hbase.consensus.metrics.ConsensusMetrics;
+import org.apache.hadoop.hbase.consensus.protocol.ConsensusHost;
+import org.apache.hadoop.hbase.consensus.raft.events.ReplicateEntriesEvent;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.metrics.TimeStat;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractWAL;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ * This is an agent that runs on the RaftQuorumContext side, and it is
+ * responsible for getting entries committed within a particular time window.
+ *
+ * There is a continuously running 'WAL Syncer' task, which will take a list
+ * of WALEdits to sync on the quorum.
+ */
+public class QuorumAgent implements ConfigurationObserver {
+  private static final Logger LOG = LoggerFactory.getLogger(QuorumAgent.class);
+
+  private volatile RaftQuorumContext context;
+
+  /** The period of time the agent will wait for a commit to replicate. */
+  private volatile long commitTimeoutInMillis;
+
+  /** The period of time after which the syncCommit method is to return. */
+  private volatile long commitDeadlineInMillis;
+
+  /**
+   * The maximum number of entries allowed in the commit queue. Appends beyond
+   * this number will be fast failed until the queue is drained.
+   */
+  private volatile long commitQueueEntriesLimit;
+
+  /**
+   * The maximum size of the commit queue in KB. Appends will be fast failed
+   * once the queue reaches this limit until it is drained.
+   */
+  private volatile long commitQueueSizeLimit;
+
+  /** The interval between retries */
+  private volatile long sleepIntervalInMillis;
+
+  private final Compression.Algorithm compressionCodec;
+
+  // Lock to guarantee the ordering of log entries in WAL
+  private final ReentrantLock appendLock = new ReentrantLock(true);
+  private final Condition groupCommitBuffer = appendLock.newCondition();
+
+  private LinkedList<WALEdit> currentList = new LinkedList<>();
+  private LinkedList<WALEdit> syncList = new LinkedList<>();
+
+  private SettableFuture<Long> currentResult;
+  private SettableFuture<Long> futureResult = SettableFuture.create();
+
+  private final ExecutorService executor;
+  private volatile boolean isSyncStopped = false;
+
+  private volatile long lastSequenceID = -1;
+
+  private Random random = new Random();
+
+  public QuorumAgent(RaftQuorumContext context) {
+    this.context = context;
+    Configuration conf = context.getConf();
+
+    commitQueueEntriesLimit = conf.getLong(
+      HConstants.QUORUM_AGENT_COMMIT_QUEUE_ENTRIES_LIMIT_KEY,
+      HConstants.QUORUM_AGENT_COMMIT_QUEUE_ENTRIES_LIMIT_DEFAULT);
+    commitQueueSizeLimit = conf.getLong(
+      HConstants.QUORUM_AGENT_COMMIT_QUEUE_SIZE_LIMIT_KEY,
+      HConstants.QUORUM_AGENT_COMMIT_QUEUE_SIZE_LIMIT_DEFAULT);
+    commitTimeoutInMillis = conf.getLong(
+      HConstants.QUORUM_AGENT_COMMIT_TIMEOUT_KEY,
+      HConstants.QUORUM_AGENT_COMMIT_TIMEOUT_DEFAULT);
+    commitDeadlineInMillis = conf.getLong(
+      HConstants.QUORUM_AGENT_COMMIT_DEADLINE_KEY,
+      HConstants.QUORUM_AGENT_COMMIT_DEADLINE_DEFAULT);
+    sleepIntervalInMillis = conf.getLong(
+      HConstants.QUORUM_CLIENT_SLEEP_INTERVAL_KEY,
+      HConstants.QUORUM_CLIENT_SLEEP_INTERVAL_DEFAULT);
+
+    compressionCodec = Compression.getCompressionAlgorithmByName(conf.get(
+      HConstants.CONSENSUS_TRANSACTION_LOG_COMPRESSION_CODEC_KEY,
+      HConstants.CONSENSUS_TRANSACTION_LOG_COMPRESSION_CODEC_DEFAULT));
+
+    executor = Executors.newSingleThreadExecutor(
+      new DaemonThreadFactory("Quorum-Syncer-"+ context.getQuorumName() + "-"));
+    submitWALSyncerTask();
+  }
+
+  private void setCommitQueueLimits(final Configuration conf) {
+    long newCommitQueueEntriesLimit = conf.getLong(
+      HConstants.QUORUM_AGENT_COMMIT_QUEUE_ENTRIES_LIMIT_KEY,
+      HConstants.QUORUM_AGENT_COMMIT_QUEUE_ENTRIES_LIMIT_DEFAULT);
+    long newCommitQueueSizeLimit = conf.getLong(
+      HConstants.QUORUM_AGENT_COMMIT_QUEUE_SIZE_LIMIT_KEY,
+      HConstants.QUORUM_AGENT_COMMIT_QUEUE_SIZE_LIMIT_DEFAULT);
+
+    if (commitQueueEntriesLimit != newCommitQueueEntriesLimit) {
+      commitQueueEntriesLimit = newCommitQueueEntriesLimit;
+      LOG.debug("Set commit queue entries limit for region: %s, entries: %d",
+        getRaftQuorumContext().getQuorumName(), commitQueueEntriesLimit);
+    }
+    if (commitQueueSizeLimit != newCommitQueueSizeLimit) {
+      commitQueueSizeLimit = newCommitQueueSizeLimit;
+      LOG.debug("Set commit queue size limit for region: %s, size: %d",
+        getRaftQuorumContext().getQuorumName(), commitQueueSizeLimit);
+    }
+  }
+
+  private void setCommitQueueTimings(final Configuration conf) {
+    long newCommitTimeoutInMillis = conf.getLong(
+      HConstants.QUORUM_AGENT_COMMIT_TIMEOUT_KEY,
+      HConstants.QUORUM_AGENT_COMMIT_TIMEOUT_DEFAULT);
+    long newCommitDeadlineInMillis = conf.getLong(
+      HConstants.QUORUM_AGENT_COMMIT_DEADLINE_KEY,
+      HConstants.QUORUM_AGENT_COMMIT_DEADLINE_DEFAULT);
+    long newSleepIntervalInMillis = conf.getLong(
+      HConstants.QUORUM_CLIENT_SLEEP_INTERVAL_KEY,
+      HConstants.QUORUM_CLIENT_SLEEP_INTERVAL_DEFAULT);
+
+    if (commitTimeoutInMillis != newCommitTimeoutInMillis) {
+      commitTimeoutInMillis = newCommitTimeoutInMillis;
+      LOG.debug("Set commit timeout for region: %s, %d ms",
+        getRaftQuorumContext().getQuorumName(), commitTimeoutInMillis);
+    }
+    if (commitDeadlineInMillis != newCommitDeadlineInMillis) {
+      commitDeadlineInMillis = newCommitDeadlineInMillis;
+      LOG.debug("Set commit deadline for region: %s, %d ms",
+        getRaftQuorumContext().getQuorumName(), commitDeadlineInMillis);
+    }
+    if (sleepIntervalInMillis != newSleepIntervalInMillis) {
+      sleepIntervalInMillis = newSleepIntervalInMillis;
+      LOG.debug("Set commit sleep interval for region: %s, %d ms",
+        getRaftQuorumContext().getQuorumName(), sleepIntervalInMillis);
+    }
+  }
+
+  public RaftQuorumContext getRaftQuorumContext() {
+    return this.context;
+  }
+
+  @Override
+  public void notifyOnChange(Configuration conf) {
+    setCommitQueueLimits(conf);
+    setCommitQueueTimings(conf);
+  }
+
+  private long getBackoffTimeMillis() {
+    double p50 = getRaftQuorumContext().getConsensusMetrics().
+      getAppendEntriesLatency().getP50();
+    // Get a random number of microseconds, up to half the p50.
+    int randomMicros = random.nextInt() % (int) (p50 / 2.0);
+    return (long) Math.max(1.0, (p50 + randomMicros) / 1000.0);
+  }
+
+  public boolean isLeader() {
+    return context.isLeader();
+  }
+
+  public String getPath() {
+    return context.getLogManager().getPath();
+  }
+
+  private void checkBeforeCommit() throws IOException {
+    // check whether the current peer is the leader
+    if (!isLeader()) {
+      throw new NoLeaderForRegionException("Current region server " +
+        context.getMyAddress() +
+        " is not the leader for the region " + context.getQuorumName());
+    }
+
+    if (this.isSyncStopped) {
+      throw new IOException("QuorumWAL syncer thread for " + context.getQuorumName() +
+        " has been stopped !");
+    }
+  }
+
+  private ListenableFuture<Long> internalCommit(List<WALEdit> edits)
+    throws IOException {
+    SettableFuture<Long> future = null;
+
+    // Add the transaction into the group commit queue
+    this.appendLock.lock();
+    try {
+      if (isSyncStopped) {
+        throw new IOException("QuorumWAL syncer thread for " +
+          context.getQuorumName() + " has been stopped!");
+      }
+      if (currentList.size() > commitQueueEntriesLimit) {
+        getRaftQuorumContext().getConsensusMetrics()
+          .incCommitQueueEntriesLimitExceeded();
+        throw new CommitQueueOverloadedException(String.format(
+          "Exceeded entries limit for region: %s, limit: %d, entries: %d",
+          context.getQuorumName(), commitQueueEntriesLimit,
+          currentList.size()), getBackoffTimeMillis());
+      }
+
+      currentList.addAll(edits);
+
+      this.groupCommitBuffer.signal();
+      future = futureResult;
+    } finally {
+      this.appendLock.unlock();
+    }
+    return future;
+  }
+
+  private ListenableFuture<Long> internalCommit(WALEdit edits)
+          throws IOException {
+    return internalCommit(Arrays.asList(edits));
+  }
+
+  /**
+   * Append to the log synchronously.
+   * @param edits WALEdit to append.
+   * @return The commit index of the committed edit.
+   * @throws IOException
+   */
+  public long syncAppend(WALEdit edits) throws IOException {
+    checkBeforeCommit();
+
+    // increase the write size
+    AbstractWAL.getWriteSizeHistogram().addValue(edits.getTotalKeyValueLength());
+
+    long start = System.nanoTime();
+    ListenableFuture<Long> future = internalCommit(edits);
+    // Wait for the group commit finish;
+    try {
+      // Wait for the transaction to complete
+      long seq = future.get();
+      // increase the sync time
+      double syncMicros = (System.nanoTime() - start) / 1000.0;
+      getRaftQuorumContext().getConsensusMetrics().getFsSyncLatency()
+        .add((long)syncMicros, TimeUnit.MICROSECONDS);
+      AbstractWAL.getSyncTimeHistogram().addValue(syncMicros);
+      return seq;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Append to the log asynchronously.
+   * @param edits
+   * @return Future containing the commit index.
+   * @throws IOException
+   */
+  public ListenableFuture<Long> asyncAppend(WALEdit edits) throws IOException {
+    checkBeforeCommit();
+    return internalCommit(edits);
+  }
+
+  /**
+   * Same as asyncAppend(WALEdit), but for a list of WALEdit.
+   * @param edits
+   * @return The commit index for the list of WALEdits
+   * @throws IOException
+   */
+  public ListenableFuture<Long> asyncAppend(List<WALEdit> edits) throws IOException {
+    checkBeforeCommit();
+    return internalCommit(edits);
+  }
+
+  public long getLastSequenceID() {
+    return lastSequenceID;
+  }
+
+  /**
+   * Stop the RaftQuorumContext. Called by an external RegionClosing thread pool
+   */
+  public void close() {
+    this.isSyncStopped = true;
+    this.executor.shutdown();
+    context.stop(true);
+
+    appendLock.lock();
+    try {
+      if (futureResult != null) {
+        futureResult.setException(new IOException("WAL already closed"));
+      }
+      if (currentResult != null) {
+        currentResult.setException(new IOException("WAL already closed"));
+      }
+    } finally {
+      appendLock.unlock();
+    }
+  }
+
+  /**
+   * A blocking API to replicate the entries to the quorum. It waits until the
+   * commit is succeeded or failed. This method is guaranteed to allow a
+   * 'commitTimeoutInMillis' period for the commit to complete while maintaining
+   * a strict deadline of 'commitDeadlineInMillis' on when this method
+   * completes.
+   *
+   * @param entries The list of the @WALEdit to replicate
+   * @return the commit index of the replicated entries.
+   * @throws IOException if the quorum threw an exception during the replication
+   */
+  private long syncCommit(List<WALEdit> entries,
+                          final SettableFuture<Long> result) throws Exception {
+    ByteBuffer serializedEntries;
+    ConsensusMetrics metrics = getRaftQuorumContext().getConsensusMetrics();
+
+    try (TimeStat.BlockTimer latency =
+                 metrics.getLogSerializationLatency().time()) {
+      serializedEntries = WALEdit.serializeToByteBuffer(entries,
+              System.currentTimeMillis(), compressionCodec);
+    }
+    int appendEntriesSize = WALEdit.getWALEditsSize(entries);
+    metrics.getAppendEntriesSize().add(appendEntriesSize);
+    metrics.getAppendEntriesBatchSize().add(entries.size());
+    if (!compressionCodec.equals(Compression.Algorithm.NONE)) {
+      int compressedSize = serializedEntries.remaining() -
+              WALEdit.PAYLOAD_HEADER_SIZE;
+      metrics.getAppendEntriesCompressedSize().add(compressedSize);
+    } else {
+      // We don't use any compression, so the compressed size would be the
+      // same as the original size.
+      metrics.getAppendEntriesCompressedSize().add(appendEntriesSize);
+    }
+
+      if (!context.isLeader()) {
+        ConsensusHost leader = context.getLeader();
+        throw new NewLeaderException(
+                leader == null ? "No leader" : leader.getHostId());
+      }
+
+      ReplicateEntriesEvent event = new ReplicateEntriesEvent(false,
+              serializedEntries, result);
+      if (!context.offerEvent(event)) {
+        ConsensusHost leader = context.getLeader();
+        throw new NewLeaderException(
+                leader == null ? "No leader" : leader.getHostId());
+      }
+      try {
+        return result.get(commitDeadlineInMillis, TimeUnit.MILLISECONDS);
+      } catch (Throwable e) {
+        if (e instanceof TimeoutException) {
+          metrics.incAppendEntriesMissedDeadline();
+          LOG.warn(String.format(
+                  "%s Failed to commit within the deadline of %dms", context,
+                  commitDeadlineInMillis));
+          throw e;
+        } else {
+          LOG.error(context + " Quorum commit failed", e);
+          throw new Exception("Quorum commit failed because " + e);
+        }
+      }
+  }
+
+  private void submitWALSyncerTask() {
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        while (!isSyncStopped) {
+          try {
+            SettableFuture<Long> nextResult = SettableFuture.create();
+            // Switch the current list under the appendLock
+            appendLock.lock();
+            try {
+              if (isSyncStopped) {
+                throw new IOException("QuorumWAL syncer thread for " +
+                  context.getQuorumName() + " has been stopped !");
+              }
+
+              if (currentList.isEmpty()) {
+                // wake up every 100ms to check if sync thread has to shut down
+                groupCommitBuffer.await(100, TimeUnit.MILLISECONDS);
+              }
+
+              if (!currentList.isEmpty()) {
+                // switch the buffer
+                assert syncList.isEmpty();
+                LinkedList<WALEdit> tmp = syncList;
+                syncList = currentList;
+                currentList = tmp;
+
+                // Create a new futureResult for the next queue
+                currentResult = futureResult;
+                futureResult = nextResult;
+              } else {
+                continue;
+              }
+
+            } catch (Exception e) {
+              // unexpected exception
+            } finally {
+              appendLock.unlock();
+            }
+
+            // Group commit to the quorum
+            long groupCommitID;
+            long start = System.nanoTime();
+            try {
+              groupCommitID = syncCommit(syncList, currentResult);
+
+              // Set the last commitID
+              assert groupCommitID > lastSequenceID;
+              lastSequenceID = groupCommitID;
+
+            } catch (Throwable e) {
+              // Signal all the rpc threads with the exception
+              currentResult.setException(e);
+            } finally {
+              // Clear the sync buffer
+              syncList.clear();
+            }
+            // Add the group sync time
+            double gsyncMicros = (System.nanoTime() - start) / 1000.0;
+            getRaftQuorumContext().getConsensusMetrics().getFsGSyncLatency()
+              .add((long) gsyncMicros, TimeUnit.MICROSECONDS);
+            AbstractWAL.getGSyncTimeHistogram().addValue(gsyncMicros);
+          } catch (Throwable e) {
+            LOG.error("Unexpected exception: ", e);
+          }
+        }
+      }
+    });
+  }
+
+  public void setLastSequenceID(long seqid)
+    throws IOException, ExecutionException, InterruptedException {
+    lastSequenceID = seqid;
+    context.reseedIndex(seqid);
+  }
+
+  public long getLastCommittedIndex() {
+    return context.getLogManager().getLastValidTransactionId().getIndex();
+  }
+
+  public Compression.Algorithm getCompressionCodec() {
+    return compressionCodec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumInfo.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumInfo.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumInfo.java
new file mode 100644
index 0000000..66e5406
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumInfo.java
@@ -0,0 +1,357 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.consensus.util.RaftUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.*;
+import java.nio.ByteBuffer;
+
+public class QuorumInfo {
+  public static final int PAYLOAD_HEADER_SIZE =
+          + Bytes.SIZEOF_BYTE   // Magic value
+          + Bytes.SIZEOF_BYTE   // Payload type
+          + Bytes.SIZEOF_BYTE;  // Payload version
+
+  private Map<String, Map<HServerAddress, Integer>> peers = null;
+  private Map<HServerAddress, Integer> peersWithRank = null;
+  private Set<String> peersAsString = null;
+  private final String quorumName;
+
+  public QuorumInfo(final Map<String, Map<HServerAddress, Integer>> peers,
+                    final String quorumName) {
+    this.peers = peers;
+    this.quorumName = quorumName;
+    populateInternalMaps();
+  }
+
+  public QuorumInfo(final QuorumInfo info) {
+    this.quorumName = info.quorumName;
+    peers = new HashMap<>();
+    for (String domain : info.getPeers().keySet()) {
+      final Map<HServerAddress, Integer> peersInDomain = new HashMap<>();
+      for (HServerAddress peer : info.getPeers().get(domain).keySet()) {
+        peersInDomain.put(new HServerAddress(peer.getHostname(), peer.getPort()),
+          info.getPeersWithRank().get(peer));
+      }
+      peers.put(domain, peersInDomain);
+    }
+    populateInternalMaps();
+  }
+
+  public int getQuorumSize() {
+    return (peers == null  ? 0 : peers.values().iterator().next().size());
+  }
+
+  public Map<HServerAddress,Integer> getPeersWithRank() {
+    return peersWithRank;
+  }
+
+  public Set<String> getPeersAsString() {
+    return peersAsString;
+  }
+
+  public Map<HServerAddress, String> getPeersWithCluster() {
+    if (peers != null) {
+      // TODO: Consider cache this map instead of computing it every time
+      Map<HServerAddress, String> peersWithCluster = new TreeMap<HServerAddress, String>();
+      for (Map.Entry<String, Map<HServerAddress, Integer>> entry : peers
+        .entrySet()) {
+        String cluster = entry.getKey();
+        for (HServerAddress serverAddress : entry.getValue().keySet()) {
+          peersWithCluster.put(serverAddress, cluster);
+        }
+      }
+      return peersWithCluster;
+    }
+    return null;
+  }
+
+  public Map<String, Map<HServerAddress, Integer>> getPeers() {
+    return peers;
+  }
+
+  public void setPeers(Map<String, Map<HServerAddress, Integer>> peers) {
+    this.peers = peers;
+    populateInternalMaps();
+  }
+
+  public String getQuorumName() {
+    return quorumName;
+  }
+
+  public static ByteBuffer serializeToBuffer(final List<QuorumInfo> configs) {
+    final ByteBuffer payload = ByteBuffer.allocate(
+      getPayloadSize(configs) + PAYLOAD_HEADER_SIZE);
+
+    // Write the MAGIC VALUE
+    payload.put(HConstants.CONSENSUS_PAYLOAD_MAGIC_VALUE);
+
+    // Write that the payload is Quorum Membership Change
+    payload.put(HConstants.QUORUM_MEMBERSHIP_CHANGE_TYPE);
+
+    // Write the version of Quorum Membership Change
+    payload.put(HConstants.QUORUM_MEMBERSHIP_CHANGE_VERSION);
+
+    // Write the total number of WALEdits
+    payload.putInt(configs.size());
+
+    byte[] quorumName, dcName, currPeerInfo = null;
+    for (QuorumInfo s : configs) {
+      // Quorum Name
+      quorumName = s.getQuorumName().getBytes();
+      payload.putInt(quorumName.length);
+      payload.put(quorumName);
+
+      // Num of DC's
+      payload.putInt(s.getPeers().size());
+      for (String dc : s.getPeers().keySet()) {
+        // DC Name
+        dcName = dc.getBytes();
+        payload.putInt(dcName.length);
+        payload.put(dcName);
+
+        Set<HServerAddress> numPeers = s.getPeers().get(dc).keySet();
+
+        // Number of peers
+        payload.putInt(numPeers.size());
+
+        for (HServerAddress peer : numPeers) {
+          // Peer Info
+          currPeerInfo = peer.getHostAddressWithPort().getBytes();
+          payload.putInt(currPeerInfo.length);
+
+          payload.put(currPeerInfo);
+
+          // Peer Rank
+          payload.putInt(s.getPeers().get(dc).get(peer));
+        }
+      }
+    }
+
+    payload.flip();
+    return payload;
+  }
+
+  /**
+   * This method reads the ByteBuffer and returns valid List of QuorumInfo objects.
+   * This method assumes that the contents of the ByteBuffer are immutable.
+   *
+   * This method does not modify the members of the ByteBuffer like position,
+   * limit, mark and capacity. It should be thread safe.
+   * @param data
+   * @return
+   */
+  public static List<QuorumInfo> deserializeFromByteBuffer(final ByteBuffer data) {
+    if (!isQuorumChangeRequest(data)) {
+      return null;
+    }
+
+    // The check above already read the magic value and type fields, so move on
+    // to the version field.
+    int currOffset = data.position() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_BYTE;
+
+    // Read the version
+    if (data.get(currOffset) != HConstants.QUORUM_MEMBERSHIP_CHANGE_VERSION) {
+      return null;
+    }
+    currOffset += Bytes.SIZEOF_BYTE;
+
+    int numConfigs = data.getInt(currOffset);
+
+    currOffset += Bytes.SIZEOF_INT;
+    List<QuorumInfo> configs = new ArrayList<>();
+
+    int numDCs, numPeers, quorumNameLength, dcNameLength, peerNameLength = 0;
+    String quorumName, dcName, peerName;
+    Map<String, Map<HServerAddress, Integer>> dcLevelInfo;
+    Map<HServerAddress, Integer> perDCPeersMap;
+
+    for (int confIndex = 0; confIndex < numConfigs; ++confIndex) {
+
+      // Quorum Name
+      quorumNameLength = data.getInt(currOffset);
+      currOffset += Bytes.SIZEOF_INT;
+
+      quorumName = new String(data.array(), data.arrayOffset() + currOffset,
+        quorumNameLength);
+      currOffset += quorumNameLength;
+
+      numDCs = data.getInt(currOffset);
+      currOffset += Bytes.SIZEOF_INT;
+
+      // Initialize the dc map
+      dcLevelInfo = new HashMap<>(numDCs);
+
+      for (int dcIndex = 0; dcIndex < numDCs; ++dcIndex) {
+
+        // DC Name
+        dcNameLength = data.getInt(currOffset);
+        currOffset += Bytes.SIZEOF_INT;
+
+        dcName = new String(data.array(), data.arrayOffset() + currOffset,
+          dcNameLength);
+        currOffset += dcNameLength;
+
+        // Num of peers in this DC
+        numPeers = data.getInt(currOffset);
+        currOffset += Bytes.SIZEOF_INT;
+
+        // Initialize the peerMap
+        perDCPeersMap = new HashMap<>(numPeers);
+        for (int peerIndex = 0; peerIndex < numPeers; ++peerIndex) {
+          // Peer Name
+          peerNameLength = data.getInt(currOffset);
+          currOffset += Bytes.SIZEOF_INT;
+
+          peerName = new String(data.array(), data.arrayOffset() + currOffset, peerNameLength);
+          currOffset += peerNameLength;
+
+          // Put the peer name and rank in the peer Map
+          perDCPeersMap.put(new HServerAddress(peerName), data.getInt(currOffset));
+          currOffset += Bytes.SIZEOF_INT;
+        }
+
+        // add the dc info to map
+        dcLevelInfo.put(dcName, perDCPeersMap);
+      }
+
+      // add the config to the list of configs to return
+      configs.add(new QuorumInfo(dcLevelInfo, quorumName));
+    }
+    return configs;
+  }
+
+  public static int getPayloadSize(final List<QuorumInfo> configs) {
+    // Number of Lists
+    int size = Bytes.SIZEOF_INT;
+
+    for (QuorumInfo s : configs) {
+      // Quorum Name length
+      size += Bytes.SIZEOF_INT;
+      size += s.getQuorumName().length();
+
+      // Num of DC's
+      size += Bytes.SIZEOF_INT;
+
+      for (String dc : s.getPeers().keySet()) {
+        // DC Name length
+        size += Bytes.SIZEOF_INT;
+        size += dc.getBytes().length;
+
+        Set<HServerAddress> numPeers = s.getPeers().get(dc).keySet();
+
+        // Number of peers
+        size += Bytes.SIZEOF_INT;
+
+        for (HServerAddress peer : numPeers) {
+          // Peer Address in String format
+          size += Bytes.SIZEOF_INT;
+          size += peer.getHostAddressWithPort().length();
+          // Peer Rank
+          size += Bytes.SIZEOF_INT;
+        }
+      }
+    }
+    return size;
+  }
+
+  /**
+   * Test whether the given buffer contains a quorum change request. This method
+   * does not change the position pointer while reading the buffer data.
+   * @param data buffer containing a possible quorum change request
+   * @return true if the buffer contains a change request, false otherwise
+   */
+  public static boolean isQuorumChangeRequest(final ByteBuffer data) {
+    int currOffset = data.position();
+
+    // Read the Magic Value
+    if (data.remaining() < PAYLOAD_HEADER_SIZE ||
+      data.get(currOffset) != HConstants.CONSENSUS_PAYLOAD_MAGIC_VALUE) {
+      return false;
+    }
+    currOffset += Bytes.SIZEOF_BYTE;
+
+    // Read the type
+    if (data.get(currOffset) != HConstants.QUORUM_MEMBERSHIP_CHANGE_TYPE) {
+      return false;
+    }
+    return true;
+  }
+
+  public void refresh() {
+    populateInternalMaps();
+  }
+
+  private void populateInternalMaps() {
+    if (peers != null) {
+      peersAsString = new HashSet<>();
+      peersWithRank = new TreeMap<>();
+      for (Map<HServerAddress, Integer> map : peers.values()) {
+        peersWithRank.putAll(map);
+        for (HServerAddress peer : map.keySet()) {
+          peersAsString.add(RaftUtil.getLocalConsensusAddress(peer).getHostAddressWithPort());
+        }
+      }
+    }
+  }
+
+  public String getDomain(final String serverAddr) {
+    String domain = "";
+    for (String c : peers.keySet()) {
+      for (HServerAddress peer : peers.get(c).keySet()) {
+        if (serverAddr.equals(peer.getHostAddressWithPort())) {
+          domain = c;
+          break;
+        }
+      }
+    }
+    return domain;
+  }
+
+  public int getRank(final HServerAddress address) {
+    int rank = 0;
+    if (peersWithRank.containsKey(address)) {
+      rank = peersWithRank.get(address);
+    }
+    return rank;
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null) {
+      return false;
+    }
+    if (!(o instanceof QuorumInfo)) {
+      return false;
+    }
+    QuorumInfo that = (QuorumInfo)o;
+    if (!this.quorumName.equals(that.quorumName)) {
+      return false;
+    }
+    if (!this.peers.equals(that.peers)) {
+      return false;
+    }
+    if (!this.peersAsString.equals(that.peersAsString)) {
+      return false;
+    }
+    if (!this.peersWithRank.equals(that.peersWithRank)) {
+      return false;
+    }
+    return true;
+  }
+
+  public boolean hasEqualReplicaSet(final QuorumInfo that) {
+    return this.peersWithRank.keySet().equals(that.peersWithRank.keySet());
+  }
+
+  @Override
+  public String toString() {
+    return String.format("{ Quorum Name = %s, peersWithRank = %s }",
+      getQuorumName(), peersWithRank);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumMembershipChangeRequest.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumMembershipChangeRequest.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumMembershipChangeRequest.java
new file mode 100644
index 0000000..9d1d525
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumMembershipChangeRequest.java
@@ -0,0 +1,38 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+public class QuorumMembershipChangeRequest extends AbstractFuture<Boolean> {
+
+  public enum QuorumMembershipChangeState {
+    PENDING,
+    JOINT_CONFIG_COMMIT_IN_PROGRESS,
+    NEW_CONFIG_COMMIT_IN_PROGRESS,
+    COMPLETE,
+    FAILED
+  };
+
+  final QuorumInfo config;
+  QuorumMembershipChangeState currentState;
+
+  public QuorumMembershipChangeRequest(QuorumInfo config) {
+    this.config = config;
+    currentState = QuorumMembershipChangeState.PENDING;
+  }
+
+  public QuorumInfo getConfig() {
+    return config;
+  }
+
+  public QuorumMembershipChangeState getCurrentState() {
+    return currentState;
+  }
+
+  public void setCurrentState(QuorumMembershipChangeState currentState) {
+    this.currentState = currentState;
+  }
+
+  public void setResponse(boolean b) {
+    set(b);
+  }
+}


Mime
View raw message