kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject kudu git commit: consensus: Remove Consensus interface
Date Fri, 02 Jun 2017 18:47:09 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 037f72b37 -> e3b7d4dc1


consensus: Remove Consensus interface

We only have one Consensus implementation now, and have no plans for
additional implementations in the future. So we can remove this
interface.

Change-Id: I21b976cb92619083e1f1766b13634b841b554c8c
Reviewed-on: http://gerrit.cloudera.org:8080/7040
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e3b7d4dc
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e3b7d4dc
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e3b7d4dc

Branch: refs/heads/master
Commit: e3b7d4dc1a6be020f3f126a66192ca1451e95e6f
Parents: 037f72b
Author: Mike Percy <mpercy@apache.org>
Authored: Wed May 31 21:21:22 2017 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Fri Jun 2 18:46:53 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/CMakeLists.txt               |   1 -
 src/kudu/consensus/consensus-test-util.h        |   5 +-
 src/kudu/consensus/consensus.cc                 |  81 ----
 src/kudu/consensus/consensus.h                  | 430 -------------------
 src/kudu/consensus/leader_election.h            |   2 +-
 src/kudu/consensus/pending_rounds.cc            |   1 +
 src/kudu/consensus/pending_rounds.h             |   8 +-
 src/kudu/consensus/raft_consensus.cc            |  71 ++-
 src/kudu/consensus/raft_consensus.h             | 357 +++++++++++++--
 .../consensus/raft_consensus_quorum-test.cc     |   4 +-
 .../ts_tablet_manager-itest.cc                  |   4 +-
 src/kudu/master/catalog_manager.cc              |  12 +-
 src/kudu/master/sys_catalog.cc                  |   2 +-
 src/kudu/tablet/tablet_replica.cc               |   4 +-
 src/kudu/tablet/tablet_replica.h                |  10 +-
 src/kudu/tablet/transactions/transaction.h      |   4 +-
 .../tablet/transactions/transaction_driver.cc   |   9 +-
 .../tablet/transactions/transaction_driver.h    |  14 +-
 .../tablet/transactions/write_transaction.h     |   4 -
 src/kudu/tserver/tablet_copy_source_session.cc  |   4 +-
 src/kudu/tserver/tablet_service.cc              |  30 +-
 src/kudu/tserver/ts_tablet_manager.cc           |   8 +-
 src/kudu/tserver/tserver-path-handlers.cc       |   4 +-
 23 files changed, 448 insertions(+), 621 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index e93ea94..ec53919 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -96,7 +96,6 @@ target_link_libraries(log
   consensus_metadata_proto)
 
 set(CONSENSUS_SRCS
-  consensus.cc
   consensus_meta.cc
   consensus_peers.cc
   consensus_queue.cc

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 89707aa..380d82b 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -27,7 +27,6 @@
 
 #include "kudu/common/timestamp.h"
 #include "kudu/common/wire_protocol.h"
-#include "kudu/consensus/consensus.h"
 #include "kudu/consensus/consensus_peers.h"
 #include "kudu/consensus/consensus_queue.h"
 #include "kudu/consensus/log.h"
@@ -643,7 +642,7 @@ class TestTransactionFactory : public ReplicaTransactionFactory {
     CHECK_OK(ThreadPoolBuilder("test-txn-factory").set_max_threads(1).Build(&pool_));
   }
 
-  void SetConsensus(Consensus* consensus) {
+  void SetConsensus(RaftConsensus* consensus) {
     consensus_ = consensus;
   }
 
@@ -673,7 +672,7 @@ class TestTransactionFactory : public ReplicaTransactionFactory {
 
  private:
   gscoped_ptr<ThreadPool> pool_;
-  Consensus* consensus_;
+  RaftConsensus* consensus_;
   Log* log_;
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.cc b/src/kudu/consensus/consensus.cc
deleted file mode 100644
index 0553b5c..0000000
--- a/src/kudu/consensus/consensus.cc
+++ /dev/null
@@ -1,81 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/consensus/consensus.h"
-
-#include <set>
-
-#include "kudu/consensus/log_util.h"
-#include "kudu/consensus/opid_util.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/strings/substitute.h"
-
-namespace kudu {
-namespace consensus {
-
-using std::shared_ptr;
-using strings::Substitute;
-
-ConsensusBootstrapInfo::ConsensusBootstrapInfo()
-  : last_id(MinimumOpId()),
-    last_committed_id(MinimumOpId()) {
-}
-
-ConsensusBootstrapInfo::~ConsensusBootstrapInfo() {
-  STLDeleteElements(&orphaned_replicates);
-}
-
-ConsensusRound::ConsensusRound(Consensus* consensus,
-                               gscoped_ptr<ReplicateMsg> replicate_msg,
-                               ConsensusReplicatedCallback replicated_cb)
-    : consensus_(consensus),
-      replicate_msg_(new RefCountedReplicate(replicate_msg.release())),
-      replicated_cb_(std::move(replicated_cb)),
-      bound_term_(-1) {}
-
-ConsensusRound::ConsensusRound(Consensus* consensus,
-                               const ReplicateRefPtr& replicate_msg)
-    : consensus_(consensus),
-      replicate_msg_(replicate_msg),
-      bound_term_(-1) {
-  DCHECK(replicate_msg_);
-}
-
-void ConsensusRound::NotifyReplicationFinished(const Status& status) {
-  if (PREDICT_FALSE(replicated_cb_.is_null())) return;
-  replicated_cb_.Run(status);
-}
-
-Status ConsensusRound::CheckBoundTerm(int64_t current_term) const {
-  if (PREDICT_FALSE(bound_term_ != -1 &&
-                    bound_term_ != current_term)) {
-    return Status::Aborted(
-      strings::Substitute(
-        "Transaction submitted in term $0 cannot be replicated in term $1",
-        bound_term_, current_term));
-  }
-  return Status::OK();
-}
-
-scoped_refptr<ConsensusRound> Consensus::NewRound(
-    gscoped_ptr<ReplicateMsg> replicate_msg,
-    const ConsensusReplicatedCallback& replicated_cb) {
-  return make_scoped_refptr(new ConsensusRound(this, std::move(replicate_msg), replicated_cb));
-}
-
-} // namespace consensus
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.h b/src/kudu/consensus/consensus.h
deleted file mode 100644
index 59910be..0000000
--- a/src/kudu/consensus/consensus.h
+++ /dev/null
@@ -1,430 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#ifndef KUDO_QUORUM_CONSENSUS_H_
-#define KUDO_QUORUM_CONSENSUS_H_
-
-#include <boost/optional/optional_fwd.hpp>
-#include <iosfwd>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "kudu/consensus/consensus.pb.h"
-#include "kudu/consensus/ref_counted_replicate.h"
-#include "kudu/gutil/callback.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/util/status.h"
-#include "kudu/util/status_callback.h"
-
-namespace kudu {
-class MonoDelta;
-
-namespace log {
-class Log;
-struct RetentionIndexes;
-}
-
-namespace master {
-class SysCatalogTable;
-}
-
-namespace server {
-class Clock;
-}
-
-namespace tablet {
-class TabletReplica;
-}
-
-namespace tserver {
-class TabletServerErrorPB;
-}
-
-namespace consensus {
-
-class ConsensusCommitContinuation;
-class ConsensusRound;
-class ReplicaTransactionFactory;
-class TimeManager;
-
-typedef int64_t ConsensusTerm;
-
-typedef StatusCallback ConsensusReplicatedCallback;
-
-struct ConsensusOptions {
-  std::string tablet_id;
-};
-
-// After completing bootstrap, some of the results need to be plumbed through
-// into the consensus implementation.
-struct ConsensusBootstrapInfo {
-  ConsensusBootstrapInfo();
-  ~ConsensusBootstrapInfo();
-
-  // The id of the last operation in the log
-  OpId last_id;
-
-  // The id of the last committed operation in the log.
-  OpId last_committed_id;
-
-  // REPLICATE messages which were in the log with no accompanying
-  // COMMIT. These need to be passed along to consensus init in order
-  // to potentially commit them.
-  //
-  // These are owned by the ConsensusBootstrapInfo instance.
-  std::vector<ReplicateMsg*> orphaned_replicates;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(ConsensusBootstrapInfo);
-};
-
-// The external interface for a consensus peer.
-//
-// Note: Even though Consensus points to Log, it needs to be destroyed
-// after it. See Log class header comment for the reason why. On the other
-// hand Consensus must be quiesced before closing the log, otherwise it
-// will try to write to a destroyed/closed log.
-//
-// The order of these operations on shutdown must therefore be:
-// 1 - quiesce Consensus
-// 2 - close/destroy Log
-// 3 - destroy Consensus
-class Consensus : public RefCountedThreadSafe<Consensus> {
- public:
-  Consensus() {}
-
-  // Starts running the consensus algorithm.
-  virtual Status Start(const ConsensusBootstrapInfo& info) = 0;
-
-  // Returns true if consensus is running.
-  virtual bool IsRunning() const = 0;
-
-  // Emulates a leader election by simply making this peer leader.
-  virtual Status EmulateElection() = 0;
-
-  // Modes for StartElection().
-  enum ElectionMode {
-    // A normal leader election. Peers will not vote for this node
-    // if they believe that a leader is alive.
-    NORMAL_ELECTION,
-
-    // A "pre-election". Peers will vote as they would for a normal
-    // election, except that the votes will not be "binding". In other
-    // words, they will not durably record their vote.
-    PRE_ELECTION,
-
-    // In this mode, peers will vote for this candidate even if they
-    // think a leader is alive. This can be used for a faster hand-off
-    // between a leader and one of its replicas.
-    ELECT_EVEN_IF_LEADER_IS_ALIVE
-  };
-
-  // Reasons for StartElection().
-  enum ElectionReason {
-    // The election is being called because the Raft configuration has only
-    // a single node and has just started up.
-    INITIAL_SINGLE_NODE_ELECTION,
-
-    // The election is being called because the timeout expired. In other
-    // words, the previous leader probably failed (or there was no leader
-    // in this term)
-    ELECTION_TIMEOUT_EXPIRED,
-
-    // The election is being started because of an explicit external request.
-    EXTERNAL_REQUEST
-  };
-
-  // Triggers a leader election.
-  virtual Status StartElection(ElectionMode mode, ElectionReason reason) = 0;
-
-  // Wait until the node has LEADER role.
-  // Returns Status::TimedOut if the role is not LEADER within 'timeout'.
-  virtual Status WaitUntilLeaderForTests(const MonoDelta& timeout) = 0;
-
-  // Implement a LeaderStepDown() request.
-  virtual Status StepDown(LeaderStepDownResponsePB* resp) {
-    return Status::NotSupported("Not implemented.");
-  }
-
-  // Creates a new ConsensusRound, the entity that owns all the data
-  // structures required for a consensus round, such as the ReplicateMsg
-  // (and later on the CommitMsg). ConsensusRound will also point to and
-  // increase the reference count for the provided callbacks.
-  scoped_refptr<ConsensusRound> NewRound(
-      gscoped_ptr<ReplicateMsg> replicate_msg,
-      const ConsensusReplicatedCallback& replicated_cb);
-
-  // Called by a Leader to replicate an entry to the state machine.
-  //
-  // From the leader instance perspective execution proceeds as follows:
-  //
-  //           Leader                               RaftConfig
-  //             +                                     +
-  //     1) Req->| Replicate()                         |
-  //             |                                     |
-  //     2)      +-------------replicate-------------->|
-  //             |<---------------ACK------------------+
-  //             |                                     |
-  //     3)      +--+                                  |
-  //           <----+ round.NotifyReplicationFinished()|
-  //             |                                     |
-  //     3a)     |  +------ update commitIndex ------->|
-  //             |                                     |
-  //
-  // 1) Caller calls Replicate(), method returns immediately to the caller and
-  //    runs asynchronously.
-  //
-  // 2) Leader replicates the entry to the peers using the consensus
-  //    algorithm, proceeds as soon as a majority of voters acknowledges the
-  //    entry.
-  //
-  // 3) Leader defers to the caller by calling ConsensusRound::NotifyReplicationFinished,
-  //    which calls the ConsensusReplicatedCallback.
-  //
-  // 3a) The leader asynchronously notifies other peers of the new
-  //     commit index, which tells them to apply the operation.
-  //
-  // This method can only be called on the leader, i.e. role() == LEADER
-  virtual Status Replicate(const scoped_refptr<ConsensusRound>& round) = 0;
-
-  // Ensures that the consensus implementation is currently acting as LEADER,
-  // and thus is allowed to submit operations to be prepared before they are
-  // replicated. To avoid a time-of-check-to-time-of-use (TOCTOU) race, the
-  // implementation also stores the current term inside the round's "bound_term"
-  // member. When we eventually are about to replicate the transaction, we verify
-  // that the term has not changed in the meantime.
-  virtual Status CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) {
-    return Status::OK();
-  }
-
-  // Messages sent from LEADER to FOLLOWERS and LEARNERS to update their
-  // state machines. This is equivalent to "AppendEntries()" in Raft
-  // terminology.
-  //
-  // ConsensusRequestPB contains a sequence of 0 or more operations to apply
-  // on the replica. If there are 0 operations the request is considered
-  // 'status-only' i.e. the leader is communicating with the follower only
-  // in order to pass back and forth information on watermarks (eg committed
-  // operation ID, replicated op id, etc).
-  //
-  // If the sequence contains 1 or more operations they will be replicated
-  // in the same order as the leader, and submitted for asynchronous Prepare
-  // in the same order.
-  //
-  // The leader also provides information on the index of the latest
-  // operation considered committed by consensus. The replica uses this
-  // information to update the state of any pending (previously replicated/prepared)
-  // transactions.
-  //
-  // Returns Status::OK if the response has been filled (regardless of accepting
-  // or rejecting the specific request). Returns non-OK Status if a specific
-  // error response could not be formed, which will result in the service
-  // returning an UNKNOWN_ERROR RPC error code to the caller and including the
-  // stringified Status message.
-  virtual Status Update(const ConsensusRequestPB* request,
-                        ConsensusResponsePB* response) = 0;
-
-  // Messages sent from CANDIDATEs to voting peers to request their vote
-  // in leader election.
-  virtual Status RequestVote(const VoteRequestPB* request,
-                             VoteResponsePB* response) = 0;
-
-  // Implement a ChangeConfig() request.
-  virtual Status ChangeConfig(const ChangeConfigRequestPB& req,
-                              const StatusCallback& client_cb,
-                              boost::optional<tserver::TabletServerErrorPB::Code>* error) {
-    return Status::NotSupported("Not implemented.");
-  }
-
-  virtual Status UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
-                                    tserver::TabletServerErrorPB::Code* error) = 0;
-
-  // Returns the current Raft role of this instance.
-  virtual RaftPeerPB::Role role() const = 0;
-
-  // Returns the uuid of this peer.
-  virtual const std::string& peer_uuid() const = 0;
-
-  // Returns the id of the tablet whose updates this consensus instance helps coordinate.
-  virtual const std::string& tablet_id() const = 0;
-
-  virtual scoped_refptr<TimeManager> time_manager() const = 0;
-
-  // Returns a copy of the state of the consensus system.
-  virtual ConsensusStatePB ConsensusState() const = 0;
-
-  // Returns a copy of the current committed Raft configuration.
-  virtual RaftConfigPB CommittedConfig() const = 0;
-
-  virtual void DumpStatusHtml(std::ostream& out) const = 0;
-
-  // Stops running the consensus algorithm.
-  virtual void Shutdown() = 0;
-
-  // Returns the last OpId (either received or committed, depending on the
-  // 'type' argument) that the Consensus implementation knows about.
-  // Primarily used for testing purposes.
-  virtual Status GetLastOpId(OpIdType type, OpId* id) {
-    return Status::NotFound("Not implemented.");
-  }
-
-
-  // Return the log indexes which the consensus implementation would like to retain.
-  //
-  // The returned 'for_durability' index ensures that no logs are GCed before
-  // the operation is fully committed. The returned 'for_peers' index indicates
-  // the index of the farthest-behind peer so that the log will try to avoid
-  // GCing these before the peer has caught up.
-  virtual log::RetentionIndexes GetRetentionIndexes() = 0;
-
- protected:
-  friend class RefCountedThreadSafe<Consensus>;
-  friend class tablet::TabletReplica;
-  friend class master::SysCatalogTable;
-
-  // This class is refcounted.
-  virtual ~Consensus() {}
-
-  enum State {
-    kNotInitialized,
-    kInitializing,
-    kConfiguring,
-    kRunning,
-  };
- private:
-  DISALLOW_COPY_AND_ASSIGN(Consensus);
-};
-
-// Factory for replica transactions.
-// An implementation of this factory must be registered prior to consensus
-// start, and is used to create transactions when the consensus implementation receives
-// messages from the leader.
-//
-// Replica transactions execute the following way:
-//
-// - When a ReplicateMsg is first received from the leader, the Consensus
-//   instance creates the ConsensusRound and calls StartReplicaTransaction().
-//   This will trigger the Prepare(). At the same time replica consensus
-//   instance immediately stores the ReplicateMsg in the Log. Once the replicate
-//   message is stored in stable storage an ACK is sent to the leader (i.e. the
-//   replica Consensus instance does not wait for Prepare() to finish).
-//
-// - When the CommitMsg for a replicate is first received from the leader
-//   the replica waits for the corresponding Prepare() to finish (if it has
-//   not completed yet) and then proceeds to trigger the Apply().
-//
-// - Once Apply() completes the ReplicaTransactionFactory is responsible for logging
-//   a CommitMsg to the log to ensure that the operation can be properly restored
-//   on a restart.
-class ReplicaTransactionFactory {
- public:
-  virtual Status StartReplicaTransaction(const scoped_refptr<ConsensusRound>& context) = 0;
-
-  virtual ~ReplicaTransactionFactory() {}
-};
-
-// Context for a consensus round on the LEADER side, typically created as an
-// out-parameter of Consensus::Append.
-// This class is ref-counted because we want to ensure it stays alive for the
-// duration of the Transaction when it is associated with a Transaction, while
-// we also want to ensure it has a proper lifecycle when a ConsensusRound is
-// pushed that is not associated with a Tablet transaction.
-class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
-
- public:
-  // Ctor used for leader transactions. Leader transactions can and must specify the
-  // callbacks prior to initiating the consensus round.
-  ConsensusRound(Consensus* consensus, gscoped_ptr<ReplicateMsg> replicate_msg,
-                 ConsensusReplicatedCallback replicated_cb);
-
-  // Ctor used for follower/learner transactions. These transactions do not use the
-  // replicate callback and the commit callback is set later, after the transaction
-  // is actually started.
-  ConsensusRound(Consensus* consensus,
-                 const ReplicateRefPtr& replicate_msg);
-
-  ReplicateMsg* replicate_msg() {
-    return replicate_msg_->get();
-  }
-
-  const ReplicateRefPtr& replicate_scoped_refptr() {
-    return replicate_msg_;
-  }
-
-  // Returns the id of the (replicate) operation this context
-  // refers to. This is only set _after_ Consensus::Replicate(context).
-  OpId id() const {
-    return replicate_msg_->get()->id();
-  }
-
-  // Register a callback that is called by Consensus to notify that the round
-  // is considered either replicated, if 'status' is OK(), or that it has
-  // permanently failed to replicate if 'status' is anything else. If 'status'
-  // is OK() then the operation can be applied to the state machine, otherwise
-  // the operation should be aborted.
-  void SetConsensusReplicatedCallback(const ConsensusReplicatedCallback& replicated_cb) {
-    replicated_cb_ = replicated_cb;
-  }
-
-  // If a continuation was set, notifies it that the round has been replicated.
-  void NotifyReplicationFinished(const Status& status);
-
-  // Binds this round such that it may not be eventually executed in any term
-  // other than 'term'.
-  // See CheckBoundTerm().
-  void BindToTerm(int64_t term) {
-    DCHECK_EQ(bound_term_, -1);
-    bound_term_ = term;
-  }
-
-  // Check for a rare race in which an operation is submitted to the LEADER in some term,
-  // then before the operation is prepared, the replica loses its leadership, receives
-  // more operations as a FOLLOWER, and then regains its leadership. We detect this case
-  // by setting the ConsensusRound's "bound term" when it is first submitted to the
-  // PREPARE queue, and validate that the term is still the same when we have finished
-  // preparing it. See KUDU-597 for details.
-  //
-  // If this round has not been bound to any term, this is a no-op.
-  Status CheckBoundTerm(int64_t current_term) const;
-
- private:
-  friend class RaftConsensusQuorumTest;
-  friend class RefCountedThreadSafe<ConsensusRound>;
-
-  ~ConsensusRound() {}
-
-  Consensus* consensus_;
-  // This round's replicate message.
-  ReplicateRefPtr replicate_msg_;
-
-  // The continuation that will be called once the transaction is
-  // deemed committed/aborted by consensus.
-  ConsensusReplicatedCallback replicated_cb_;
-
-  // The leader term that this round was submitted in. CheckBoundTerm()
-  // ensures that, when it is eventually replicated, the term has not
-  // changed in the meantime.
-  //
-  // Set to -1 if no term has been bound.
-  int64_t bound_term_;
-};
-
-} // namespace consensus
-} // namespace kudu
-
-#endif /* CONSENSUS_H_ */

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/leader_election.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election.h b/src/kudu/consensus/leader_election.h
index d0129f4..d0f15ce 100644
--- a/src/kudu/consensus/leader_election.h
+++ b/src/kudu/consensus/leader_election.h
@@ -23,8 +23,8 @@
 #include <unordered_map>
 #include <vector>
 
-#include "kudu/consensus/consensus.h"
 #include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/raft_consensus.h"
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/pending_rounds.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/pending_rounds.cc b/src/kudu/consensus/pending_rounds.cc
index 121cb24..adbca50 100644
--- a/src/kudu/consensus/pending_rounds.cc
+++ b/src/kudu/consensus/pending_rounds.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/consensus/pending_rounds.h"
 
+#include "kudu/consensus/raft_consensus.h"
 #include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/pending_rounds.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/pending_rounds.h b/src/kudu/consensus/pending_rounds.h
index 02a8686..ccc9cd4 100644
--- a/src/kudu/consensus/pending_rounds.h
+++ b/src/kudu/consensus/pending_rounds.h
@@ -20,14 +20,16 @@
 #include <map>
 #include <string>
 
-#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/opid_util.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/util/status.h"
+#include "kudu/gutil/ref_counted.h"
 
 namespace kudu {
-namespace consensus {
+class Status;
 
+namespace consensus {
+class ConsensusRound;
 class TimeManager;
 
 // Tracks the pending consensus rounds being managed by a Raft replica (either leader

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index b7f5cdc..d50bf9a 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -377,24 +377,24 @@ Status RaftConsensus::EmulateElection() {
 }
 
 namespace {
-const char* ModeString(Consensus::ElectionMode mode) {
+const char* ModeString(RaftConsensus::ElectionMode mode) {
   switch (mode) {
-    case Consensus::NORMAL_ELECTION:
+    case RaftConsensus::NORMAL_ELECTION:
       return "leader election";
-    case Consensus::PRE_ELECTION:
+    case RaftConsensus::PRE_ELECTION:
       return "pre-election";
-    case Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE:
+    case RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE:
       return "forced leader election";
   }
   __builtin_unreachable(); // silence gcc warnings
 }
-string ReasonString(Consensus::ElectionReason reason, StringPiece leader_uuid) {
+string ReasonString(RaftConsensus::ElectionReason reason, StringPiece leader_uuid) {
   switch (reason) {
-    case Consensus::INITIAL_SINGLE_NODE_ELECTION:
+    case RaftConsensus::INITIAL_SINGLE_NODE_ELECTION:
       return "initial election of a single-replica configuration";
-    case Consensus::EXTERNAL_REQUEST:
+    case RaftConsensus::EXTERNAL_REQUEST:
       return "received explicit request";
-    case Consensus::ELECTION_TIMEOUT_EXPIRED:
+    case RaftConsensus::ELECTION_TIMEOUT_EXPIRED:
       if (leader_uuid.empty()) {
         return "no leader contacted us within the election timeout";
       }
@@ -522,6 +522,12 @@ Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
   return Status::OK();
 }
 
+scoped_refptr<ConsensusRound> RaftConsensus::NewRound(
+    gscoped_ptr<ReplicateMsg> replicate_msg,
+    const ConsensusReplicatedCallback& replicated_cb) {
+  return make_scoped_refptr(new ConsensusRound(this, std::move(replicate_msg), replicated_cb));
+}
+
 void RaftConsensus::ReportFailureDetected(const std::string& name, const Status& /*msg*/) {
   DCHECK_EQ(name, kTimerId);
   // Start an election.
@@ -2676,5 +2682,54 @@ ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const {
   return cmeta_.get();
 }
 
+////////////////////////////////////////////////////////////////////////
+// ConsensusBootstrapInfo
+////////////////////////////////////////////////////////////////////////
+
+ConsensusBootstrapInfo::ConsensusBootstrapInfo()
+  : last_id(MinimumOpId()),
+    last_committed_id(MinimumOpId()) {
+}
+
+ConsensusBootstrapInfo::~ConsensusBootstrapInfo() {
+  STLDeleteElements(&orphaned_replicates);
+}
+
+////////////////////////////////////////////////////////////////////////
+// ConsensusRound
+////////////////////////////////////////////////////////////////////////
+
+ConsensusRound::ConsensusRound(RaftConsensus* consensus,
+                               gscoped_ptr<ReplicateMsg> replicate_msg,
+                               ConsensusReplicatedCallback replicated_cb)
+    : consensus_(consensus),
+      replicate_msg_(new RefCountedReplicate(replicate_msg.release())),
+      replicated_cb_(std::move(replicated_cb)),
+      bound_term_(-1) {}
+
+ConsensusRound::ConsensusRound(RaftConsensus* consensus,
+                               const ReplicateRefPtr& replicate_msg)
+    : consensus_(consensus),
+      replicate_msg_(replicate_msg),
+      bound_term_(-1) {
+  DCHECK(replicate_msg_);
+}
+
+void ConsensusRound::NotifyReplicationFinished(const Status& status) {
+  if (PREDICT_FALSE(replicated_cb_.is_null())) return;
+  replicated_cb_.Run(status);
+}
+
+Status ConsensusRound::CheckBoundTerm(int64_t current_term) const {
+  if (PREDICT_FALSE(bound_term_ != -1 &&
+                    bound_term_ != current_term)) {
+    return Status::Aborted(
+      strings::Substitute(
+        "Transaction submitted in term $0 cannot be replicated in term $1",
+        bound_term_, current_term));
+  }
+  return Status::OK();
+}
+
 }  // namespace consensus
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index f00a8c2..9ce29ec 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -24,41 +24,99 @@
 #include <utility>
 #include <vector>
 
-#include "kudu/consensus/consensus.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_queue.h"
+#include "kudu/consensus/peer_manager.h"
 #include "kudu/consensus/pending_rounds.h"
 #include "kudu/consensus/time_manager.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/failure_detector.h"
+#include "kudu/util/status_callback.h"
 
 namespace kudu {
 
 class Counter;
 class FailureDetector;
 class HostPort;
+class MonoDelta;
+class Status;
 class ThreadPool;
 
-namespace server {
-class Clock;
+namespace log {
+class Log;
+struct RetentionIndexes;
 }
 
 namespace rpc {
 class Messenger;
 }
 
+namespace server {
+class Clock;
+}
+
+namespace tserver {
+class TabletServerErrorPB;
+}
+
 namespace consensus {
+
 class ConsensusMetadata;
+class ConsensusRound;
 class Peer;
 class PeerProxyFactory;
 class PeerManager;
+class ReplicaTransactionFactory;
 class TimeManager;
+
+struct ConsensusBootstrapInfo;
 struct ElectionResult;
 
-class RaftConsensus : public Consensus,
+struct ConsensusOptions {
+  std::string tablet_id;
+};
+
+typedef int64_t ConsensusTerm;
+typedef StatusCallback ConsensusReplicatedCallback;
+
+class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
                       public PeerMessageQueueObserver {
  public:
+
+  // Modes for StartElection().
+  enum ElectionMode {
+    // A normal leader election. Peers will not vote for this node
+    // if they believe that a leader is alive.
+    NORMAL_ELECTION,
+
+    // A "pre-election". Peers will vote as they would for a normal
+    // election, except that the votes will not be "binding". In other
+    // words, they will not durably record their vote.
+    PRE_ELECTION,
+
+    // In this mode, peers will vote for this candidate even if they
+    // think a leader is alive. This can be used for a faster hand-off
+    // between a leader and one of its replicas.
+    ELECT_EVEN_IF_LEADER_IS_ALIVE
+  };
+
+  // Reasons for StartElection().
+  enum ElectionReason {
+    // The election is being called because the Raft configuration has only
+    // a single node and has just started up.
+    INITIAL_SINGLE_NODE_ELECTION,
+
+    // The election is being called because the timeout expired. In other
+    // words, the previous leader probably failed (or there was no leader
+    // in this term)
+    ELECTION_TIMEOUT_EXPIRED,
+
+    // The election is being started because of an explicit external request.
+    EXTERNAL_REQUEST
+  };
+
   static scoped_refptr<RaftConsensus> Create(
     ConsensusOptions options,
     std::unique_ptr<ConsensusMetadata> cmeta,
@@ -85,64 +143,150 @@ class RaftConsensus : public Consensus,
                 std::shared_ptr<MemTracker> parent_mem_tracker,
                 Callback<void(const std::string& reason)> mark_dirty_clbk);
 
-  virtual ~RaftConsensus();
+  // Starts running the Raft consensus algorithm.
+  Status Start(const ConsensusBootstrapInfo& info);
 
-  Status Start(const ConsensusBootstrapInfo& info) override;
-
-  bool IsRunning() const override;
+  // Returns true if RaftConsensus is running.
+  bool IsRunning() const;
 
   // Emulates an election by increasing the term number and asserting leadership
   // in the configuration by sending a NO_OP to other peers.
   // This is NOT safe to use in a distributed configuration with failure detection
   // enabled, as it could result in a split-brain scenario.
-  Status EmulateElection() override;
+  Status EmulateElection();
+
+  // Triggers a leader election.
+  Status StartElection(ElectionMode mode, ElectionReason reason);
 
-  Status StartElection(ElectionMode mode, ElectionReason reason) override;
+  // Wait until the node has LEADER role.
+  // Returns Status::TimedOut if the role is not LEADER within 'timeout'.
+  Status WaitUntilLeaderForTests(const MonoDelta& timeout);
 
-  Status WaitUntilLeaderForTests(const MonoDelta& timeout) override;
+  // Implement a LeaderStepDown() request.
+  Status StepDown(LeaderStepDownResponsePB* resp);
 
-  Status StepDown(LeaderStepDownResponsePB* resp) override;
+  // Creates a new ConsensusRound, the entity that owns all the data
+  // structures required for a consensus round, such as the ReplicateMsg
+  // (and later on the CommitMsg). ConsensusRound will also point to and
+  // increase the reference count for the provided callbacks.
+  scoped_refptr<ConsensusRound> NewRound(
+      gscoped_ptr<ReplicateMsg> replicate_msg,
+      const ConsensusReplicatedCallback& replicated_cb);
 
   // Call StartElection(), log a warning if the call fails (usually due to
   // being shut down).
   void ReportFailureDetected(const std::string& name, const Status& msg);
 
-  Status Replicate(const scoped_refptr<ConsensusRound>& round) override;
-
-  Status CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) override;
-
+  // Called by a Leader to replicate an entry to the state machine.
+  //
+  // From the leader instance perspective execution proceeds as follows:
+  //
+  //           Leader                               RaftConfig
+  //             +                                     +
+  //     1) Req->| Replicate()                         |
+  //             |                                     |
+  //     2)      +-------------replicate-------------->|
+  //             |<---------------ACK------------------+
+  //             |                                     |
+  //     3)      +--+                                  |
+  //           <----+ round.NotifyReplicationFinished()|
+  //             |                                     |
+  //     3a)     |  +------ update commitIndex ------->|
+  //             |                                     |
+  //
+  // 1) Caller calls Replicate(), method returns immediately to the caller and
+  //    runs asynchronously.
+  //
+  // 2) Leader replicates the entry to the peers using the consensus
+  //    algorithm, proceeds as soon as a majority of voters acknowledges the
+  //    entry.
+  //
+  // 3) Leader defers to the caller by calling ConsensusRound::NotifyReplicationFinished,
+  //    which calls the ConsensusReplicatedCallback.
+  //
+  // 3a) The leader asynchronously notifies other peers of the new
+  //     commit index, which tells them to apply the operation.
+  //
+  // This method can only be called on the leader, i.e. role() == LEADER
+  Status Replicate(const scoped_refptr<ConsensusRound>& round);
+
+  // Ensures that the consensus implementation is currently acting as LEADER,
+  // and thus is allowed to submit operations to be prepared before they are
+  // replicated. To avoid a time-of-check-to-time-of-use (TOCTOU) race, the
+  // implementation also stores the current term inside the round's "bound_term"
+  // member. When we eventually are about to replicate the transaction, we verify
+  // that the term has not changed in the meantime.
+  Status CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round);
+
+  // Messages sent from LEADER to FOLLOWERS and LEARNERS to update their
+  // state machines. This is equivalent to "AppendEntries()" in Raft
+  // terminology.
+  //
+  // ConsensusRequestPB contains a sequence of 0 or more operations to apply
+  // on the replica. If there are 0 operations the request is considered
+  // 'status-only' i.e. the leader is communicating with the follower only
+  // in order to pass back and forth information on watermarks (eg committed
+  // operation ID, replicated op id, etc).
+  //
+  // If the sequence contains 1 or more operations they will be replicated
+  // in the same order as the leader, and submitted for asynchronous Prepare
+  // in the same order.
+  //
+  // The leader also provides information on the index of the latest
+  // operation considered committed by consensus. The replica uses this
+  // information to update the state of any pending (previously replicated/prepared)
+  // transactions.
+  //
+  // Returns Status::OK if the response has been filled (regardless of accepting
+  // or rejecting the specific request). Returns non-OK Status if a specific
+  // error response could not be formed, which will result in the service
+  // returning an UNKNOWN_ERROR RPC error code to the caller and including the
+  // stringified Status message.
   Status Update(const ConsensusRequestPB* request,
-                ConsensusResponsePB* response) override;
+                ConsensusResponsePB* response);
 
+  // Messages sent from CANDIDATEs to voting peers to request their vote
+  // in leader election.
   Status RequestVote(const VoteRequestPB* request,
-                     VoteResponsePB* response) override;
+                     VoteResponsePB* response);
 
+  // Implement a ChangeConfig() request.
   Status ChangeConfig(const ChangeConfigRequestPB& req,
                       const StatusCallback& client_cb,
-                      boost::optional<tserver::TabletServerErrorPB::Code>* error_code) override;
+                      boost::optional<tserver::TabletServerErrorPB::Code>* error_code);
 
+  // Implement an UnsafeChangeConfig() request.
   Status UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
-                            tserver::TabletServerErrorPB::Code* error_code) override;
+                            tserver::TabletServerErrorPB::Code* error_code);
 
-  Status GetLastOpId(OpIdType type, OpId* id) override;
+  // Returns the last OpId (either received or committed, depending on the
+  // 'type' argument) that the Consensus implementation knows about.
+  // Primarily used for testing purposes.
+  Status GetLastOpId(OpIdType type, OpId* id);
 
-  RaftPeerPB::Role role() const override;
+  // Returns the current Raft role of this instance.
+  RaftPeerPB::Role role() const;
 
+  // Returns the uuid of this peer.
   // Thread-safe.
-  const std::string& peer_uuid() const override;
+  const std::string& peer_uuid() const;
 
+  // Returns the id of the tablet whose updates this consensus instance helps coordinate.
   // Thread-safe.
-  const std::string& tablet_id() const override;
+  const std::string& tablet_id() const;
 
-  scoped_refptr<TimeManager> time_manager() const override { return time_manager_; }
+  scoped_refptr<TimeManager> time_manager() const { return time_manager_; }
 
-  ConsensusStatePB ConsensusState() const override;
+  // Returns a copy of the state of the consensus system.
+  ConsensusStatePB ConsensusState() const;
 
-  RaftConfigPB CommittedConfig() const override;
+  // Returns a copy of the current committed Raft configuration.
+  RaftConfigPB CommittedConfig() const;
 
-  void DumpStatusHtml(std::ostream& out) const override;
+  void DumpStatusHtml(std::ostream& out) const;
 
-  void Shutdown() override;
+  // Stop running the Raft consensus algorithm.
+  void Shutdown();
 
   // Makes this peer advance it's term (and step down if leader), for tests.
   Status AdvanceTermForTests(int64_t new_term);
@@ -158,17 +302,24 @@ class RaftConsensus : public Consensus,
   // Updates the committed_index and triggers the Apply()s for whatever
   // transactions were pending.
   // This is idempotent.
-  void NotifyCommitIndex(int64_t commit_index) override;
+  void NotifyCommitIndex(int64_t commit_index);
 
-  void NotifyTermChange(int64_t term) override;
+  void NotifyTermChange(int64_t term);
 
   void NotifyFailedFollower(const std::string& uuid,
                             int64_t term,
-                            const std::string& reason) override;
+                            const std::string& reason);
 
-  log::RetentionIndexes GetRetentionIndexes() override;
+  // Return the log indexes which the consensus implementation would like to retain.
+  //
+  // The returned 'for_durability' index ensures that no logs are GCed before
+  // the operation is fully committed. The returned 'for_peers' index indicates
+  // the index of the farthest-behind peer so that the log will try to avoid
+  // GCing these before the peer has caught up.
+  log::RetentionIndexes GetRetentionIndexes();
 
  private:
+  friend class RefCountedThreadSafe<RaftConsensus>;
   friend class RaftConsensusQuorumTest;
   FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind);
   FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind);
@@ -224,6 +375,9 @@ class RaftConsensus : public Consensus,
   using LockGuard = std::lock_guard<simple_spinlock>;
   using UniqueLock = std::unique_lock<simple_spinlock>;
 
+  // Private because this class is refcounted.
+  ~RaftConsensus();
+
   // Returns string description for State enum value.
   static const char* State_Name(State state);
 
@@ -636,5 +790,142 @@ class RaftConsensus : public Consensus,
   DISALLOW_COPY_AND_ASSIGN(RaftConsensus);
 };
 
+// After completing bootstrap, some of the results need to be plumbed through
+// into the consensus implementation.
+struct ConsensusBootstrapInfo {
+  ConsensusBootstrapInfo();
+  ~ConsensusBootstrapInfo();
+
+  // The id of the last operation in the log
+  OpId last_id;
+
+  // The id of the last committed operation in the log.
+  OpId last_committed_id;
+
+  // REPLICATE messages which were in the log with no accompanying
+  // COMMIT. These need to be passed along to consensus init in order
+  // to potentially commit them.
+  //
+  // These are owned by the ConsensusBootstrapInfo instance.
+  std::vector<ReplicateMsg*> orphaned_replicates;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(ConsensusBootstrapInfo);
+};
+
+// Factory for replica transactions.
+// An implementation of this factory must be registered prior to consensus
+// start, and is used to create transactions when the consensus implementation receives
+// messages from the leader.
+//
+// Replica transactions execute the following way:
+//
+// - When a ReplicateMsg is first received from the leader, the RaftConsensus
+//   instance creates the ConsensusRound and calls StartReplicaTransaction().
+//   This will trigger the Prepare(). At the same time replica consensus
+//   instance immediately stores the ReplicateMsg in the Log. Once the replicate
+//   message is stored in stable storage an ACK is sent to the leader (i.e. the
+//   replica RaftConsensus instance does not wait for Prepare() to finish).
+//
+// - When the CommitMsg for a replicate is first received from the leader
+//   the replica waits for the corresponding Prepare() to finish (if it has
+//   not completed yet) and then proceeds to trigger the Apply().
+//
+// - Once Apply() completes the ReplicaTransactionFactory is responsible for logging
+//   a CommitMsg to the log to ensure that the operation can be properly restored
+//   on a restart.
+class ReplicaTransactionFactory {
+ public:
+  virtual Status StartReplicaTransaction(const scoped_refptr<ConsensusRound>& context) = 0;
+
+  virtual ~ReplicaTransactionFactory() {}
+};
+
+// Context for a consensus round on the LEADER side, typically created as an
+// out-parameter of RaftConsensus::Append().
+// This class is ref-counted because we want to ensure it stays alive for the
+// duration of the Transaction when it is associated with a Transaction, while
+// we also want to ensure it has a proper lifecycle when a ConsensusRound is
+// pushed that is not associated with a Tablet transaction.
+class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
+
+ public:
+  // Ctor used for leader transactions. Leader transactions can and must specify the
+  // callbacks prior to initiating the consensus round.
+  ConsensusRound(RaftConsensus* consensus, gscoped_ptr<ReplicateMsg> replicate_msg,
+                 ConsensusReplicatedCallback replicated_cb);
+
+  // Ctor used for follower/learner transactions. These transactions do not use the
+  // replicate callback and the commit callback is set later, after the transaction
+  // is actually started.
+  ConsensusRound(RaftConsensus* consensus,
+                 const ReplicateRefPtr& replicate_msg);
+
+  ReplicateMsg* replicate_msg() {
+    return replicate_msg_->get();
+  }
+
+  const ReplicateRefPtr& replicate_scoped_refptr() {
+    return replicate_msg_;
+  }
+
+  // Returns the id of the (replicate) operation this context
+  // refers to. This is only set _after_ RaftConsensus::Replicate(context).
+  OpId id() const {
+    return replicate_msg_->get()->id();
+  }
+
+  // Register a callback that is called by RaftConsensus to notify that the round
+  // is considered either replicated, if 'status' is OK(), or that it has
+  // permanently failed to replicate if 'status' is anything else. If 'status'
+  // is OK() then the operation can be applied to the state machine, otherwise
+  // the operation should be aborted.
+  void SetConsensusReplicatedCallback(const ConsensusReplicatedCallback& replicated_cb) {
+    replicated_cb_ = replicated_cb;
+  }
+
+  // If a continuation was set, notifies it that the round has been replicated.
+  void NotifyReplicationFinished(const Status& status);
+
+  // Binds this round such that it may not be eventually executed in any term
+  // other than 'term'.
+  // See CheckBoundTerm().
+  void BindToTerm(int64_t term) {
+    DCHECK_EQ(bound_term_, -1);
+    bound_term_ = term;
+  }
+
+  // Check for a rare race in which an operation is submitted to the LEADER in some term,
+  // then before the operation is prepared, the replica loses its leadership, receives
+  // more operations as a FOLLOWER, and then regains its leadership. We detect this case
+  // by setting the ConsensusRound's "bound term" when it is first submitted to the
+  // PREPARE queue, and validate that the term is still the same when we have finished
+  // preparing it. See KUDU-597 for details.
+  //
+  // If this round has not been bound to any term, this is a no-op.
+  Status CheckBoundTerm(int64_t current_term) const;
+
+ private:
+  friend class RefCountedThreadSafe<ConsensusRound>;
+  friend class RaftConsensusQuorumTest;
+
+  ~ConsensusRound() {}
+
+  RaftConsensus* consensus_;
+  // This round's replicate message.
+  ReplicateRefPtr replicate_msg_;
+
+  // The continuation that will be called once the transaction is
+  // deemed committed/aborted by consensus.
+  ConsensusReplicatedCallback replicated_cb_;
+
+  // The leader term that this round was submitted in. CheckBoundTerm()
+  // ensures that, when it is eventually replicated, the term has not
+  // changed in the meantime.
+  //
+  // Set to -1 if no term has been bound.
+  int64_t bound_term_;
+};
+
 }  // namespace consensus
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index a0933c8..d45890b 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -890,8 +890,8 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
     // non-shutdown peer in the list become leader.
     int flush_count_before = new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
     LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1);
-    ASSERT_OK(new_leader->StartElection(Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
-                                        Consensus::EXTERNAL_REQUEST));
+    ASSERT_OK(new_leader->StartElection(RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
+                                        RaftConsensus::EXTERNAL_REQUEST));
     WaitUntilLeaderForTests(new_leader.get());
     LOG(INFO) << "Election won";
     int flush_count_after = new_leader->consensus_metadata_for_tests()->flush_count_for_tests();

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/integration-tests/ts_tablet_manager-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 9d5c6a5..547fef0 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -134,7 +134,7 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
   ASSERT_OK(CreateTabletServerMap(master_proxy, client_messenger_, &ts_map));
   ValueDeleter deleter(&ts_map);
 
-  // Collect the TabletReplicas so we get direct access to Consensus.
+  // Collect the TabletReplicas so we get direct access to RaftConsensus.
   vector<scoped_refptr<TabletReplica> > tablet_replicas;
   for (int replica = 0; replica < kNumReplicas; replica++) {
     MiniTabletServer* ts = cluster_->mini_tablet_server(replica);
@@ -157,7 +157,7 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
     SCOPED_TRACE(Substitute("Iter: $0", i));
     int new_leader_idx = rand() % 2;
     LOG(INFO) << "Electing peer " << new_leader_idx << "...";
-    consensus::Consensus* con = CHECK_NOTNULL(tablet_replicas[new_leader_idx]->consensus());
+    consensus::RaftConsensus* con = CHECK_NOTNULL(tablet_replicas[new_leader_idx]->consensus());
     ASSERT_OK(con->EmulateElection());
     LOG(INFO) << "Waiting for servers to agree...";
     ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(5),

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index c67b423..df7935e 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -222,10 +222,10 @@ namespace master {
 using base::subtle::NoBarrier_CompareAndSwap;
 using base::subtle::NoBarrier_Load;
 using cfile::TypeEncodingInfo;
-using consensus::Consensus;
 using consensus::ConsensusServiceProxy;
 using consensus::ConsensusStatePB;
 using consensus::GetConsensusRole;
+using consensus::RaftConsensus;
 using consensus::RaftPeerPB;
 using consensus::StartTabletCopyRequestPB;
 using consensus::kMinimumTerm;
@@ -747,7 +747,7 @@ Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
   const string& uuid = master_->fs_manager()->uuid();
   if (!cstate.has_leader_uuid() || cstate.leader_uuid() != uuid) {
     return Status::IllegalState(
-        Substitute("Node $0 not leader. Consensus state: $1",
+        Substitute("Node $0 not leader. Raft Consensus state: $1",
                     uuid, SecureShortDebugString(cstate)));
   }
 
@@ -925,7 +925,7 @@ void CatalogManager::PrepareForLeadershipTask() {
     // Hack to block this function until InitSysCatalogAsync() is finished.
     shared_lock<LockType> l(lock_);
   }
-  const Consensus* consensus = sys_catalog_->tablet_replica()->consensus();
+  const RaftConsensus* consensus = sys_catalog_->tablet_replica()->consensus();
   const int64_t term_before_wait = consensus->ConsensusState().current_term();
   {
     std::lock_guard<simple_spinlock> l(state_lock_);
@@ -967,7 +967,7 @@ void CatalogManager::PrepareForLeadershipTask() {
     // task. If the error is considered fatal, LOG(FATAL) is called.
     const auto check = [this](
         std::function<Status()> func,
-        const Consensus& consensus,
+        const RaftConsensus& consensus,
         int64_t start_term,
         const char* op_description) {
 
@@ -1101,7 +1101,7 @@ bool CatalogManager::IsInitialized() const {
 }
 
 RaftPeerPB::Role CatalogManager::Role() const {
-  scoped_refptr<consensus::Consensus> consensus;
+  scoped_refptr<consensus::RaftConsensus> consensus;
   {
     std::lock_guard<simple_spinlock> l(state_lock_);
     if (state_ == kRunning) {
@@ -4164,7 +4164,7 @@ CatalogManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock(
   const string& uuid = catalog_->master_->fs_manager()->uuid();
   if (PREDICT_FALSE(!cstate.has_leader_uuid() || cstate.leader_uuid() != uuid)) {
     leader_status_ = Status::IllegalState(
-        Substitute("Not the leader. Local UUID: $0, Consensus state: $1",
+        Substitute("Not the leader. Local UUID: $0, Raft Consensus state: $1",
                    uuid, SecureShortDebugString(cstate)));
     return;
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index c25aae7..bf7ae7a 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -276,7 +276,7 @@ Status SysCatalogTable::CreateDistributedConfig(const MasterOptions& options,
 
 void SysCatalogTable::SysCatalogStateChanged(const string& tablet_id, const string& reason) {
   CHECK_EQ(tablet_id, tablet_replica_->tablet_id());
-  scoped_refptr<consensus::Consensus> consensus  = tablet_replica_->shared_consensus();
+  scoped_refptr<consensus::RaftConsensus> consensus  = tablet_replica_->shared_consensus();
   if (!consensus) {
     LOG_WITH_PREFIX(WARNING) << "Received notification of tablet state change "
                              << "but tablet no longer running. Tablet ID: "

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 67c408b..61cb3bb 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -25,7 +25,6 @@
 #include <utility>
 #include <vector>
 
-#include "kudu/consensus/consensus.h"
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_anchor_registry.h"
@@ -84,7 +83,6 @@ METRIC_DEFINE_histogram(tablet, op_prepare_run_time, "Operation Prepare Run Time
                         "locks.",
                         10000000, 2);
 
-using consensus::Consensus;
 using consensus::ConsensusBootstrapInfo;
 using consensus::ConsensusMetadata;
 using consensus::ConsensusOptions;
@@ -318,7 +316,7 @@ Status TabletReplica::WaitUntilConsensusRunning(const MonoDelta& timeout) {
     MonoTime now(MonoTime::Now());
     MonoDelta elapsed(now - start);
     if (elapsed > timeout) {
-      return Status::TimedOut(Substitute("Consensus is not running after waiting for $0. State; $1",
+      return Status::TimedOut(Substitute("Raft Consensus is not running after waiting for $0: $1",
                                          elapsed.ToString(), TabletStatePB_Name(cached_state)));
     }
     SleepFor(MonoDelta::FromMilliseconds(1L << backoff_exp));

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 51cd387..15ec7dc 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -24,8 +24,8 @@
 #include <string>
 #include <vector>
 
-#include "kudu/consensus/consensus.h"
 #include "kudu/consensus/log.h"
+#include "kudu/consensus/raft_consensus.h"
 #include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/ref_counted.h"
@@ -73,7 +73,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
                 Callback<void(const std::string& reason)> mark_dirty_clbk);
 
   // Initializes the TabletReplica, namely creating the Log and initializing
-  // Consensus.
+  // RaftConsensus.
   Status Init(const std::shared_ptr<tablet::Tablet>& tablet,
               const scoped_refptr<server::Clock>& clock,
               const std::shared_ptr<rpc::Messenger>& messenger,
@@ -122,12 +122,12 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   virtual Status StartReplicaTransaction(
       const scoped_refptr<consensus::ConsensusRound>& round) OVERRIDE;
 
-  consensus::Consensus* consensus() {
+  consensus::RaftConsensus* consensus() {
     std::lock_guard<simple_spinlock> lock(lock_);
     return consensus_.get();
   }
 
-  scoped_refptr<consensus::Consensus> shared_consensus() const {
+  scoped_refptr<consensus::RaftConsensus> shared_consensus() const {
     std::lock_guard<simple_spinlock> lock(lock_);
     return consensus_;
   }
@@ -288,7 +288,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   scoped_refptr<log::Log> log_;
   std::shared_ptr<Tablet> tablet_;
   std::shared_ptr<rpc::Messenger> messenger_;
-  scoped_refptr<consensus::Consensus> consensus_;
+  scoped_refptr<consensus::RaftConsensus> consensus_;
   simple_spinlock prepare_replicate_lock_;
 
   // Lock protecting state_, last_status_, as well as smart pointers to collaborating

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/transactions/transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction.h b/src/kudu/tablet/transactions/transaction.h
index f0696a6..72db821 100644
--- a/src/kudu/tablet/transactions/transaction.h
+++ b/src/kudu/tablet/transactions/transaction.h
@@ -24,11 +24,11 @@
 
 #include "kudu/common/timestamp.h"
 #include "kudu/common/wire_protocol.h"
-#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/raft_consensus.h"
 #include "kudu/util/auto_release_pool.h"
 #include "kudu/util/locks.h"
-#include "kudu/util/status.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/status.h"
 
 namespace kudu {
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index cb6bf64..57f841a 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -19,7 +19,6 @@
 
 #include <mutex>
 
-#include "kudu/consensus/consensus.h"
 #include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/rpc/result_tracker.h"
@@ -36,11 +35,9 @@ namespace kudu {
 namespace tablet {
 
 using consensus::CommitMsg;
-using consensus::Consensus;
-using consensus::ConsensusRound;
-using consensus::ReplicateMsg;
-using consensus::CommitMsg;
 using consensus::DriverType;
+using consensus::RaftConsensus;
+using consensus::ReplicateMsg;
 using log::Log;
 using rpc::RequestIdPB;
 using rpc::ResultTracker;
@@ -83,7 +80,7 @@ class FollowerTransactionCompletionCallback : public TransactionCompletionCallba
 ////////////////////////////////////////////////////////////
 
 TransactionDriver::TransactionDriver(TransactionTracker *txn_tracker,
-                                     Consensus* consensus,
+                                     RaftConsensus* consensus,
                                      Log* log,
                                      ThreadPool* prepare_pool,
                                      ThreadPool* apply_pool,

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/transactions/transaction_driver.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.h b/src/kudu/tablet/transactions/transaction_driver.h
index 044edda..e0a987c 100644
--- a/src/kudu/tablet/transactions/transaction_driver.h
+++ b/src/kudu/tablet/transactions/transaction_driver.h
@@ -21,7 +21,7 @@
 #include <string>
 #include <kudu/rpc/result_tracker.h>
 
-#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/raft_consensus.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/server/clock.h"
@@ -69,7 +69,7 @@ class TransactionTracker;
 //      follower and ReplicationFinished() has already been called, then we can move
 //      on to ApplyAsync().
 //
-//  4 - The Consensus implementation calls ReplicationFinished()
+//  4 - RaftConsensus calls ReplicationFinished()
 //
 //      This is triggered by consensus when the commit index moves past our own
 //      OpId. On followers, this can happen before Prepare() finishes, and thus
@@ -220,7 +220,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
   // Construct TransactionDriver. TransactionDriver does not take ownership
   // of any of the objects pointed to in the constructor's arguments.
   TransactionDriver(TransactionTracker* txn_tracker,
-                    consensus::Consensus* consensus,
+                    consensus::RaftConsensus* consensus,
                     log::Log* log,
                     ThreadPool* prepare_pool,
                     ThreadPool* apply_pool,
@@ -247,7 +247,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
   // at the next synchronization point.
   void Abort(const Status& status);
 
-  // Callback from Consensus when replication is complete, and thus the operation
+  // Callback from RaftConsensus when replication is complete, and thus the operation
   // is considered "committed" from the consensus perspective (ie it will be
   // applied on every node, and not ever truncated from the state machine history).
   // If status is anything different from OK() we don't proceed with the apply.
@@ -307,7 +307,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
   // Submits ApplyTask to the apply pool.
   Status ApplyAsync();
 
-  // Calls Transaction::Apply() followed by Consensus::Commit() with the
+  // Calls Transaction::Apply() followed by RaftConsensus::Commit() with the
   // results from the Apply().
   void ApplyTask();
 
@@ -343,7 +343,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
   void RegisterFollowerTransactionOnResultTracker();
 
   TransactionTracker* const txn_tracker_;
-  consensus::Consensus* const consensus_;
+  consensus::RaftConsensus* const consensus_;
   log::Log* const log_;
   ThreadPool* const prepare_pool_;
   ThreadPool* const apply_pool_;
@@ -355,7 +355,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
   mutable simple_spinlock lock_;
 
   // A copy of the transaction's OpId, set when the transaction first
-  // receives one from Consensus and uninitialized until then.
+  // receives one from RaftConsensus and uninitialized until then.
   // TODO(todd): we have three separate copies of this now -- in TransactionState,
   // CommitMsg, and here... we should be able to consolidate!
   consensus::OpId op_id_copy_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/transactions/write_transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.h b/src/kudu/tablet/transactions/write_transaction.h
index f8ea48d..c9aa80f 100644
--- a/src/kudu/tablet/transactions/write_transaction.h
+++ b/src/kudu/tablet/transactions/write_transaction.h
@@ -36,10 +36,6 @@ struct DecodedRowOperation;
 class ConstContiguousRow;
 class RowwiseRowBlockPB;
 
-namespace consensus {
-class Consensus;
-}
-
 namespace tserver {
 class WriteRequestPB;
 class WriteResponsePB;

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tserver/tablet_copy_source_session.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc
index 2926792..85faa49 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -120,12 +120,12 @@ Status TabletCopySourceSession::Init() {
   // We do this after snapshotting the log to avoid a scenario where the latest
   // entry in the log has a term higher than the term stored in the consensus
   // metadata, which will results in a CHECK failure on RaftConsensus init.
-  scoped_refptr<consensus::Consensus> consensus = tablet_replica_->shared_consensus();
+  scoped_refptr<consensus::RaftConsensus> consensus = tablet_replica_->shared_consensus();
   if (!consensus) {
     tablet::TabletStatePB tablet_state = tablet_replica_->state();
     return Status::IllegalState(Substitute(
         "Unable to initialize tablet copy session for tablet $0. "
-        "Consensus is not available. Tablet state: $1 ($2)",
+        "Raft Consensus is not available. Tablet state: $1 ($2)",
         tablet_id, tablet::TabletStatePB_Name(tablet_state), tablet_state));
   }
   initial_cstate_ = consensus->ConsensusState();

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 82740ab..68fa88b 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -28,7 +28,7 @@
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
-#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/raft_consensus.h"
 #include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/casts.h"
@@ -103,7 +103,6 @@ DECLARE_int32(tablet_history_max_age_sec);
 using google::protobuf::RepeatedPtrField;
 using kudu::consensus::ChangeConfigRequestPB;
 using kudu::consensus::ChangeConfigResponsePB;
-using kudu::consensus::Consensus;
 using kudu::consensus::ConsensusRequestPB;
 using kudu::consensus::ConsensusResponsePB;
 using kudu::consensus::GetLastOpIdRequestPB;
@@ -113,6 +112,7 @@ using kudu::consensus::LeaderStepDownRequestPB;
 using kudu::consensus::LeaderStepDownResponsePB;
 using kudu::consensus::UnsafeChangeConfigRequestPB;
 using kudu::consensus::UnsafeChangeConfigResponsePB;
+using kudu::consensus::RaftConsensus;
 using kudu::consensus::RunLeaderElectionRequestPB;
 using kudu::consensus::RunLeaderElectionResponsePB;
 using kudu::consensus::StartTabletCopyRequestPB;
@@ -214,10 +214,10 @@ template<class RespClass>
 bool GetConsensusOrRespond(const scoped_refptr<TabletReplica>& replica,
                            RespClass* resp,
                            rpc::RpcContext* context,
-                           scoped_refptr<Consensus>* consensus) {
+                           scoped_refptr<RaftConsensus>* consensus) {
   *consensus = replica->shared_consensus();
   if (!*consensus) {
-    Status s = Status::ServiceUnavailable("Consensus unavailable. Tablet not running");
+    Status s = Status::ServiceUnavailable("Raft Consensus unavailable. Tablet not running");
     SetupErrorAndRespond(resp->mutable_error(), s,
                          TabletServerErrorPB::TABLET_NOT_RUNNING, context);
     return false;
@@ -855,8 +855,8 @@ void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req,
 
   replica->permanent_uuid();
 
-  // Submit the update directly to the TabletReplica's Consensus instance.
-  scoped_refptr<Consensus> consensus;
+  // Submit the update directly to the TabletReplica's RaftConsensus instance.
+  scoped_refptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   Status s = consensus->Update(req, resp);
   if (PREDICT_FALSE(!s.ok())) {
@@ -886,7 +886,7 @@ void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req,
   }
 
   // Submit the vote request directly to the consensus instance.
-  scoped_refptr<Consensus> consensus;
+  scoped_refptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   Status s = consensus->RequestVote(req, resp);
   if (PREDICT_FALSE(!s.ok())) {
@@ -911,7 +911,7 @@ void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req,
     return;
   }
 
-  scoped_refptr<Consensus> consensus;
+  scoped_refptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   boost::optional<TabletServerErrorPB::Code> error_code;
   Status s = consensus->ChangeConfig(*req, BindHandleResponse(req, resp, context), &error_code);
@@ -935,7 +935,7 @@ void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB*
     return;
   }
 
-  scoped_refptr<Consensus> consensus;
+  scoped_refptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   TabletServerErrorPB::Code error_code;
   Status s = consensus->UnsafeChangeConfig(*req, &error_code);
@@ -966,11 +966,11 @@ void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* r
     return;
   }
 
-  scoped_refptr<Consensus> consensus;
+  scoped_refptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   Status s = consensus->StartElection(
-      consensus::Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
-      consensus::Consensus::EXTERNAL_REQUEST);
+      consensus::RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
+      consensus::RaftConsensus::EXTERNAL_REQUEST);
   if (PREDICT_FALSE(!s.ok())) {
     SetupErrorAndRespond(resp->mutable_error(), s,
                          TabletServerErrorPB::UNKNOWN_ERROR,
@@ -992,7 +992,7 @@ void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req,
     return;
   }
 
-  scoped_refptr<Consensus> consensus;
+  scoped_refptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   Status s = consensus->StepDown(resp);
   if (PREDICT_FALSE(!s.ok())) {
@@ -1022,7 +1022,7 @@ void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *re
                          TabletServerErrorPB::TABLET_NOT_RUNNING, context);
     return;
   }
-  scoped_refptr<Consensus> consensus;
+  scoped_refptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) {
     HandleUnknownError(Status::InvalidArgument("Invalid opid_type specified to GetLastOpId()"),
@@ -1057,7 +1057,7 @@ void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateR
       continue;
     }
 
-    scoped_refptr<Consensus> consensus(replica->shared_consensus());
+    scoped_refptr<RaftConsensus> consensus(replica->shared_consensus());
     if (!consensus) {
       continue;
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index db4c51d..e4a676c 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -303,7 +303,7 @@ Status TSTabletManager::CreateNewTablet(const string& table_id,
     "Couldn't create tablet metadata");
 
   // We must persist the consensus metadata to disk before starting a new
-  // tablet's TabletReplica and Consensus implementation.
+  // tablet's TabletReplica and RaftConsensus implementation.
   unique_ptr<ConsensusMetadata> cmeta;
   RETURN_NOT_OK_PREPEND(ConsensusMetadata::Create(fs_manager_, tablet_id, fs_manager_->uuid(),
                                                   config, consensus::kMinimumTerm, &cmeta),
@@ -645,10 +645,10 @@ Status TSTabletManager::DeleteTablet(
   // restarting the tablet if the local replica committed a higher config
   // change op during that time, or potentially something else more invasive.
   if (cas_config_opid_index_less_or_equal && !tablet_deleted) {
-    scoped_refptr<consensus::Consensus> consensus = replica->shared_consensus();
+    scoped_refptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
     if (!consensus) {
       *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
-      return Status::IllegalState("Consensus not available. Tablet shutting down");
+      return Status::IllegalState("Raft Consensus not available. Tablet shutting down");
     }
     RaftConfigPB committed_config = consensus->CommittedConfig();
     if (committed_config.opid_index() > *cas_config_opid_index_less_or_equal) {
@@ -987,7 +987,7 @@ void TSTabletManager::CreateReportedTabletPB(const string& tablet_id,
   reported_tablet->set_schema_version(replica->tablet_metadata()->schema_version());
 
   // We cannot get consensus state information unless the TabletReplica is running.
-  scoped_refptr<consensus::Consensus> consensus = replica->shared_consensus();
+  scoped_refptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
   if (consensus) {
     *reported_tablet->mutable_consensus_state() = consensus->ConsensusState();
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tserver/tserver-path-handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver-path-handlers.cc b/src/kudu/tserver/tserver-path-handlers.cc
index 4aa9407..c4599ae 100644
--- a/src/kudu/tserver/tserver-path-handlers.cc
+++ b/src/kudu/tserver/tserver-path-handlers.cc
@@ -253,7 +253,7 @@ void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& re
                                  .PartitionDebugString(replica->tablet_metadata()->partition(),
                                                        replica->tablet_metadata()->schema());
 
-      scoped_refptr<consensus::Consensus> consensus = replica->shared_consensus();
+      scoped_refptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
       (*output) << Substitute(
           // Table name, tablet id, partition
           "<tr><td>$0</td><td>$1</td><td>$2</td>"
@@ -456,7 +456,7 @@ void TabletServerPathHandlers::HandleConsensusStatusPage(const Webserver::WebReq
   string id;
   scoped_refptr<TabletReplica> replica;
   if (!LoadTablet(tserver_, req, &id, &replica, output)) return;
-  scoped_refptr<consensus::Consensus> consensus = replica->shared_consensus();
+  scoped_refptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
   if (!consensus) {
     *output << "Tablet " << EscapeForHtmlToString(id) << " not running";
     return;


Mime
View raw message