kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject kudu git commit: consensus: split ReplicaState in twain[1]
Date Thu, 20 Oct 2016 21:28:59 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 65cb2edf5 -> 27992c120


consensus: split ReplicaState in twain[1]

Following on the previous refactor, this splits out the parts of the
ReplicaState class that deal with pending consensus rounds into a new
PendingRounds class.

The change is mostly mechanical. The new class is made non-thread-safe
and we still use the synchronization of the ReplicaState class to
protect it. Since all of the methods dealing with pending rounds were
already 'Unlocked', this should be safe. The goal is to later make the
synchronization more fine-grained.

To that end, all of the methods were renamed to remove the 'Unlocked'
suffix.

There were several places in RaftConsensus which were using the
committed index/term tracked by PendingRounds, but it seems plausible
that these would be better tracked elsewhere. I left TODOs to deal with
this rather than making non-mechanical changes in this patch.

[1] https://www.youtube.com/watch?v=ZXRb6nPcx10

Change-Id: I95308ae8a5d65ada016ae08e0e8cf06c54b35909
Reviewed-on: http://gerrit.cloudera.org:8080/4713
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/27992c12
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/27992c12
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/27992c12

Branch: refs/heads/master
Commit: 27992c120e1aebe5597665d981e8efc0586aab55
Parents: 65cb2ed
Author: Todd Lipcon <todd@apache.org>
Authored: Wed Oct 12 21:01:45 2016 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Thu Oct 20 21:28:45 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/raft_consensus.cc            |  38 +--
 src/kudu/consensus/raft_consensus.h             |   7 +-
 .../consensus/raft_consensus_quorum-test.cc     |  15 +-
 src/kudu/consensus/raft_consensus_state.cc      | 229 +++++++++----------
 src/kudu/consensus/raft_consensus_state.h       | 155 ++++++-------
 5 files changed, 219 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/27992c12/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 90ab3df..956fb6d 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -234,6 +234,7 @@ RaftConsensus::RaftConsensus(
       txn_factory_(txn_factory),
       peer_manager_(std::move(peer_manager)),
       queue_(std::move(queue)),
+      pending_(Substitute("T $0 P $1", options.tablet_id, peer_uuid)),
       rng_(GetRandomSeed32()),
       failure_monitor_(GetRandomSeed32(), GetFailureMonitorCheckMeanMs(),
                        GetFailureMonitorCheckStddevMs()),
@@ -291,7 +292,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
       RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr));
     }
 
-    state_->SetInitialCommittedOpIdUnlocked(info.last_committed_id);
+    pending_.SetInitialCommittedOpId(info.last_committed_id);
 
     queue_->Init(info.last_id);
   }
@@ -635,7 +636,7 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
     }
   }
 
-  return state_->AddPendingOperation(round);
+  return pending_.AddPendingOperation(round);
 }
 
 void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
@@ -648,10 +649,8 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
     return;
   }
 
-  state_->AdvanceCommittedIndexUnlocked(commit_index);
+  pending_.AdvanceCommittedIndex(commit_index);
 
-  // TODO: is this right? the goal is to signal a new request
-  // whenever we have new commit-index to propagate.
   if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
     peer_manager_->SignalRequest(false);
   }
@@ -807,7 +806,8 @@ std::string RaftConsensus::LeaderRequest::OpsRangeString() const {
 
 void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req,
                                                      LeaderRequest* deduplicated_req) {
-  int64_t last_committed_index = state_->GetCommittedIndexUnlocked();
+  // TODO(todd): use queue committed index?
+  int64_t last_committed_index = pending_.GetCommittedIndex();
 
   // The leader's preceding id.
   deduplicated_req->preceding_opid = &rpc_req->preceding_id();
@@ -832,7 +832,7 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB*
rpc_req
       // If the index is uncommitted and below our match index, then it must be in the
       // pendings set.
       scoped_refptr<ConsensusRound> round =
-          state_->GetPendingOpByIndexOrNullUnlocked(leader_msg->id().index());
+          pending_.GetPendingOpByIndexOrNull(leader_msg->id().index());
       DCHECK(round) << "Could not find op with index " << leader_msg->id().index()
                     << " in pending set. committed= " << last_committed_index
                     << " dedup=" << dedup_up_to_index;
@@ -895,7 +895,7 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const
LeaderRequ
                                                                 ConsensusResponsePB* response)
{
 
   bool term_mismatch;
-  if (state_->IsOpCommittedOrPending(*req.preceding_opid, &term_mismatch)) {
+  if (pending_.IsOpCommittedOrPending(*req.preceding_opid, &term_mismatch)) {
     return Status::OK();
   }
 
@@ -932,7 +932,7 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const
LeaderRequ
 }
 
 void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index) {
-  state_->AbortOpsAfterUnlocked(truncate_after_index);
+  pending_.AbortOpsAfter(truncate_after_index);
   queue_->TruncateOpsAfter(truncate_after_index);
 }
 
@@ -960,7 +960,7 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB*
reque
   Status s;
   const OpId* prev = deduped_req->preceding_opid;
   for (const ReplicateRefPtr& message : deduped_req->messages) {
-    s = ReplicaState::CheckOpInSequence(*prev, message->get()->id());
+    s = PendingRounds::CheckOpInSequence(*prev, message->get()->id());
     if (PREDICT_FALSE(!s.ok())) {
       LOG(ERROR) << "Leader request contained out-of-sequence messages. Status: "
           << s.ToString() << ". Leader Request: " << request->ShortDebugString();
@@ -999,7 +999,7 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB*
reque
   if (!deduped_req->messages.empty()) {
 
     bool term_mismatch;
-    CHECK(!state_->IsOpCommittedOrPending(deduped_req->messages[0]->get()->id(),
&term_mismatch));
+    CHECK(!pending_.IsOpCommittedOrPending(deduped_req->messages[0]->get()->id(),
&term_mismatch));
 
     // If the index is in our log but the terms are not the same abort down to the leader's
     // preceding id.
@@ -1153,13 +1153,13 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639, and...
     // 3. ...the leader's committed index is always our upper bound.
     int64_t early_apply_up_to = std::min<int64_t>({
-        state_->GetLastPendingTransactionOpIdUnlocked().index(),
+        pending_.GetLastPendingTransactionOpId().index(),
         deduped_req.preceding_opid->index(),
         request->committed_index()});
 
     VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " << early_apply_up_to;
     TRACE("Early marking committed up to index $0", early_apply_up_to);
-    CHECK_OK(state_->AdvanceCommittedIndexUnlocked(early_apply_up_to));
+    CHECK_OK(pending_.AdvanceCommittedIndex(early_apply_up_to));
 
     // 2 - Enqueue the prepares
 
@@ -1267,7 +1267,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
 
     VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking committed up to " << apply_up_to;
     TRACE("Marking committed up to $0", apply_up_to);
-    CHECK_OK(state_->AdvanceCommittedIndexUnlocked(apply_up_to));
+    CHECK_OK(pending_.AdvanceCommittedIndex(apply_up_to));
     queue_->UpdateFollowerWatermarks(apply_up_to, request->all_replicated_index());
 
     // If any messages failed to be started locally, then we already have removed them
@@ -1561,7 +1561,7 @@ void RaftConsensus::Shutdown() {
   // We must close the queue after we close the peers.
   queue_->Close();
 
-  CHECK_OK(state_->CancelPendingTransactions());
+  CHECK_OK(pending_.CancelPendingTransactions());
 
   {
     ReplicaState::UniqueLock lock;
@@ -1797,7 +1797,9 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
   // in the queue -- when the queue is in LEADER mode, it checks that all
   // registered peers are a part of the active config.
   peer_manager_->Close();
-  queue_->SetLeaderMode(state_->GetCommittedIndexUnlocked(),
+  // TODO(todd): should use queue committed index here? in that case do
+  // we need to pass it in at all?
+  queue_->SetLeaderMode(pending_.GetCommittedIndex(),
                         state_->GetCurrentTermUnlocked(),
                         active_config);
   RETURN_NOT_OK(peer_manager_->UpdateRaftConfig(active_config));
@@ -1983,8 +1985,8 @@ Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
   if (type == RECEIVED_OPID) {
     *DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog();
   } else if (type == COMMITTED_OPID) {
-    id->set_term(state_->GetTermWithLastCommittedOpUnlocked());
-    id->set_index(state_->GetCommittedIndexUnlocked());
+    id->set_term(pending_.GetTermWithLastCommittedOp());
+    id->set_index(pending_.GetCommittedIndex());
   } else {
     return Status::InvalidArgument("Unsupported OpIdType", OpIdType_Name(type));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/27992c12/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 2cd80c6..3a67087 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -455,9 +455,14 @@ class RaftConsensus : public Consensus,
 
   gscoped_ptr<ReplicaState> state_;
 
+  // The currently pending rounds that have not yet been committed by
+  // consensus. Protected by the locks inside state_.
+  // TODO(todd) these locks will become more fine-grained.
+  PendingRounds pending_;
+
   Random rng_;
 
-  // TODO: Plumb this from ServerBase.
+  // TODO(mpercy): Plumb this from ServerBase.
   RandomizedFailureMonitor failure_monitor_;
 
   scoped_refptr<FailureDetector> failure_detector_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/27992c12/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index d9336ac..7e7c80d 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -303,19 +303,14 @@ class RaftConsensusQuorumTest : public KuduTest {
 
     scoped_refptr<RaftConsensus> peer;
     CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
-    ReplicaState* state = peer->GetReplicaStateForTests();
 
     int backoff_exp = 0;
     const int kMaxBackoffExp = 8;
-    int64_t committed_op_idx;
+    OpId committed;
     while (true) {
-      {
-        ReplicaState::UniqueLock lock;
-        CHECK_OK(state->LockForRead(&lock));
-        committed_op_idx = state->GetCommittedIndexUnlocked();
-        if (committed_op_idx >= to_wait_for) {
-          return;
-        }
+      if (peer->GetLastOpId(COMMITTED_OPID, &committed).ok() &&
+          committed.index() >= to_wait_for) {
+        return;
       }
       if (MonoTime::Now() > (start + timeout)) {
         break;
@@ -326,7 +321,7 @@ class RaftConsensusQuorumTest : public KuduTest {
 
     LOG(ERROR) << "Max timeout reached (" << timeout.ToString() << ") while
waiting for commit of "
                << "op " << to_wait_for << " on replica. Last committed
op on replica: "
-               << committed_op_idx << ". Dumping state and quitting.";
+               << committed.index() << ". Dumping state and quitting.";
     vector<string> lines;
     scoped_refptr<RaftConsensus> leader;
     CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));

http://git-wip-us.apache.org/repos/asf/kudu/blob/27992c12/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index 522e728..7b9454a 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -47,7 +47,6 @@ ReplicaState::ReplicaState(ConsensusOptions options, string peer_uuid,
     : options_(std::move(options)),
       peer_uuid_(std::move(peer_uuid)),
       cmeta_(std::move(cmeta)),
-      last_committed_op_id_(MinimumOpId()),
       state_(kInitialized) {
   CHECK(cmeta_) << "ConsensusMeta passed as NULL";
 }
@@ -250,26 +249,6 @@ const RaftConfigPB& ReplicaState::GetActiveConfigUnlocked() const
{
   return cmeta_->active_config();
 }
 
-bool ReplicaState::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) {
-
-  *term_mismatch = false;
-
-  if (op_id.index() <= GetCommittedIndexUnlocked()) {
-    return true;
-  }
-
-  scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNullUnlocked(op_id.index());
-  if (!round) {
-    return false;
-  }
-
-  if (round->id().term() != op_id.term()) {
-    *term_mismatch = true;
-    return false;
-  }
-  return true;
-}
-
 Status ReplicaState::SetCurrentTermUnlocked(int64_t new_term,
                                             FlushToDisk flush) {
   TRACE_EVENT1("consensus", "ReplicaState::SetCurrentTermUnlocked",
@@ -332,39 +311,77 @@ const ConsensusOptions& ReplicaState::GetOptions() const {
   return options_;
 }
 
-int ReplicaState::GetNumPendingTxnsUnlocked() const {
+string ReplicaState::LogPrefix() {
+  ReplicaState::UniqueLock lock;
+  CHECK_OK(LockForRead(&lock));
+  return LogPrefixUnlocked();
+}
+
+string ReplicaState::LogPrefixUnlocked() const {
   DCHECK(update_lock_.is_locked());
-  return pending_txns_.size();
+  return Substitute("T $0 P $1 [term $2 $3]: ",
+                    options_.tablet_id,
+                    peer_uuid_,
+                    GetCurrentTermUnlocked(),
+                    RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
 }
 
-Status ReplicaState::CancelPendingTransactions() {
-  {
-    ThreadRestrictions::AssertWaitAllowed();
-    UniqueLock lock(update_lock_);
-    if (state_ != kShuttingDown) {
-      return Status::IllegalState("Can only wait for pending commits on kShuttingDown state.");
-    }
-    if (pending_txns_.empty()) {
-      return Status::OK();
-    }
+string ReplicaState::LogPrefixThreadSafe() const {
+  return Substitute("T $0 P $1: ",
+                    options_.tablet_id,
+                    peer_uuid_);
+}
 
-    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Trying to abort " << pending_txns_.size()
-                                   << " pending transactions.";
-    for (const auto& txn : pending_txns_) {
-      const scoped_refptr<ConsensusRound>& round = txn.second;
-      // We cancel only transactions whose applies have not yet been triggered.
-      LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting transaction as it isn't in flight:
"
-                            << txn.second->replicate_msg()->ShortDebugString();
-      round->NotifyReplicationFinished(Status::Aborted("Transaction aborted"));
-    }
+ReplicaState::State ReplicaState::state() const {
+  DCHECK(update_lock_.is_locked());
+  return state_;
+}
+
+string ReplicaState::ToString() const {
+  ThreadRestrictions::AssertWaitAllowed();
+  ReplicaState::UniqueLock lock(update_lock_);
+  return ToStringUnlocked();
+}
+
+string ReplicaState::ToStringUnlocked() const {
+  DCHECK(update_lock_.is_locked());
+  return Substitute("Replica: $0, State: $1, Role: $2",
+                    peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
+}
+
+//------------------------------------------------------------
+// PendingRounds
+// TODO(todd): move to its own file
+//------------------------------------------------------------
+
+PendingRounds::PendingRounds(string log_prefix)
+    : log_prefix_(std::move(log_prefix)),
+      last_committed_op_id_(MinimumOpId()) {
+}
+PendingRounds::~PendingRounds() {
+}
+
+Status PendingRounds::CancelPendingTransactions() {
+  ThreadRestrictions::AssertWaitAllowed();
+  if (pending_txns_.empty()) {
+    return Status::OK();
+  }
+
+  LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_txns_.size()
+                                 << " pending transactions.";
+  for (const auto& txn : pending_txns_) {
+    const scoped_refptr<ConsensusRound>& round = txn.second;
+    // We cancel only transactions whose applies have not yet been triggered.
+    LOG_WITH_PREFIX(INFO) << "Aborting transaction as it isn't in flight: "
+                                   << txn.second->replicate_msg()->ShortDebugString();
+    round->NotifyReplicationFinished(Status::Aborted("Transaction aborted"));
   }
   return Status::OK();
 }
 
-void ReplicaState::AbortOpsAfterUnlocked(int64_t index) {
-  DCHECK(update_lock_.is_locked());
-  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting all transactions after (but not including):
"
-      << index << ". Current State: " << ToStringUnlocked();
+void PendingRounds::AbortOpsAfter(int64_t index) {
+  LOG_WITH_PREFIX(INFO) << "Aborting all transactions after (but not including) "
+                                 << index;
 
   DCHECK_GE(index, 0);
   OpId new_preceding;
@@ -384,7 +401,7 @@ void ReplicaState::AbortOpsAfterUnlocked(int64_t index) {
   for (; iter != pending_txns_.end();) {
     const scoped_refptr<ConsensusRound>& round = (*iter).second;
     auto op_type = round->replicate_msg()->op_type();
-    LOG_WITH_PREFIX_UNLOCKED(INFO)
+    LOG_WITH_PREFIX(INFO)
         << "Aborting uncommitted " << OperationType_Name(op_type)
         << " operation due to leader change: " << round->replicate_msg()->id();
 
@@ -394,28 +411,47 @@ void ReplicaState::AbortOpsAfterUnlocked(int64_t index) {
   }
 }
 
-Status ReplicaState::AddPendingOperation(const scoped_refptr<ConsensusRound>& round)
{
-  DCHECK(update_lock_.is_locked());
-  if (PREDICT_FALSE(state_ != kRunning)) {
-    return Status::IllegalState("Cannot trigger prepare. Replica is not in kRunning state.");
-  }
-
+Status PendingRounds::AddPendingOperation(const scoped_refptr<ConsensusRound>&
round) {
   InsertOrDie(&pending_txns_, round->replicate_msg()->id().index(), round);
   return Status::OK();
 }
 
-scoped_refptr<ConsensusRound> ReplicaState::GetPendingOpByIndexOrNullUnlocked(int64_t
index) {
-  DCHECK(update_lock_.is_locked());
+scoped_refptr<ConsensusRound> PendingRounds::GetPendingOpByIndexOrNull(int64_t index)
{
   return FindPtrOrNull(pending_txns_, index);
 }
 
-Status ReplicaState::AdvanceCommittedIndexUnlocked(int64_t committed_index) {
+bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) {
+
+  *term_mismatch = false;
+
+  if (op_id.index() <= GetCommittedIndex()) {
+    return true;
+  }
+
+  scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNull(op_id.index());
+  if (!round) {
+    return false;
+  }
+
+  if (round->id().term() != op_id.term()) {
+    *term_mismatch = true;
+    return false;
+  }
+  return true;
+}
+
+OpId PendingRounds::GetLastPendingTransactionOpId() const {
+  return pending_txns_.empty()
+      ? MinimumOpId() : (--pending_txns_.end())->second->id();
+}
+
+Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) {
   // If we already committed up to (or past) 'id' return.
   // This can happen in the case that multiple UpdateConsensus() calls end
   // up in the RPC queue at the same time, and then might get interleaved out
   // of order.
   if (last_committed_op_id_.index() >= committed_index) {
-    VLOG_WITH_PREFIX_UNLOCKED(1)
+    VLOG_WITH_PREFIX(1)
       << "Already marked ops through " << last_committed_op_id_ << " as
committed. "
       << "Now trying to mark " << committed_index << " which would be a
no-op.";
     return Status::OK();
@@ -426,7 +462,7 @@ Status ReplicaState::AdvanceCommittedIndexUnlocked(int64_t committed_index)
{
                << " from " << last_committed_op_id_
                << " we have no pending txns"
                << GetStackTrace();
-    VLOG_WITH_PREFIX_UNLOCKED(1) << "No transactions to mark as committed up to: "
+    VLOG_WITH_PREFIX(1) << "No transactions to mark as committed up to: "
                                  << committed_index;
     return Status::OK();
   }
@@ -437,7 +473,7 @@ Status ReplicaState::AdvanceCommittedIndexUnlocked(int64_t committed_index)
{
   auto end_iter = pending_txns_.upper_bound(committed_index);
   CHECK(iter != pending_txns_.end());
 
-  VLOG_WITH_PREFIX_UNLOCKED(1) << "Last triggered apply was: "
+  VLOG_WITH_PREFIX(1) << "Last triggered apply was: "
       <<  last_committed_op_id_
       << " Starting to apply from log index: " << (*iter).first;
 
@@ -458,8 +494,7 @@ Status ReplicaState::AdvanceCommittedIndexUnlocked(int64_t committed_index)
{
   return Status::OK();
 }
 
-
-Status ReplicaState::SetInitialCommittedOpIdUnlocked(const OpId& committed_op) {
+Status PendingRounds::SetInitialCommittedOpId(const OpId& committed_op) {
   CHECK_EQ(last_committed_op_id_.index(), 0);
   if (!pending_txns_.empty()) {
     int64_t first_pending_index = pending_txns_.begin()->first;
@@ -473,7 +508,7 @@ Status ReplicaState::SetInitialCommittedOpIdUnlocked(const OpId& committed_op)
{
       last_committed_op_id_ = committed_op;
     }
 
-    RETURN_NOT_OK(AdvanceCommittedIndexUnlocked(committed_op.index()));
+    RETURN_NOT_OK(AdvanceCommittedIndex(committed_op.index()));
     CHECK_EQ(last_committed_op_id_.ShortDebugString(),
              committed_op.ShortDebugString());
 
@@ -483,63 +518,7 @@ Status ReplicaState::SetInitialCommittedOpIdUnlocked(const OpId&
committed_op) {
   return Status::OK();
 }
 
-int64_t ReplicaState::GetCommittedIndexUnlocked() const {
-  DCHECK(update_lock_.is_locked());
-  return last_committed_op_id_.index();
-}
-
-int64_t ReplicaState::GetTermWithLastCommittedOpUnlocked() const {
-  DCHECK(update_lock_.is_locked());
-  return last_committed_op_id_.term();
-}
-
-OpId ReplicaState::GetLastPendingTransactionOpIdUnlocked() const {
-  DCHECK(update_lock_.is_locked());
-  return pending_txns_.empty()
-      ? MinimumOpId() : (--pending_txns_.end())->second->id();
-}
-
-
-string ReplicaState::LogPrefix() {
-  ReplicaState::UniqueLock lock;
-  CHECK_OK(LockForRead(&lock));
-  return LogPrefixUnlocked();
-}
-
-string ReplicaState::LogPrefixUnlocked() const {
-  DCHECK(update_lock_.is_locked());
-  return Substitute("T $0 P $1 [term $2 $3]: ",
-                    options_.tablet_id,
-                    peer_uuid_,
-                    GetCurrentTermUnlocked(),
-                    RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
-}
-
-string ReplicaState::LogPrefixThreadSafe() const {
-  return Substitute("T $0 P $1: ",
-                    options_.tablet_id,
-                    peer_uuid_);
-}
-
-ReplicaState::State ReplicaState::state() const {
-  DCHECK(update_lock_.is_locked());
-  return state_;
-}
-
-string ReplicaState::ToString() const {
-  ThreadRestrictions::AssertWaitAllowed();
-  ReplicaState::UniqueLock lock(update_lock_);
-  return ToStringUnlocked();
-}
-
-string ReplicaState::ToStringUnlocked() const {
-  DCHECK(update_lock_.is_locked());
-  return Substitute("Replica: $0, State: $1, Role: $2, Last Committed: $3",
-                    peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()),
-                    OpIdToString(last_committed_op_id_));
-}
-
-Status ReplicaState::CheckOpInSequence(const OpId& previous, const OpId& current)
{
+Status PendingRounds::CheckOpInSequence(const OpId& previous, const OpId& current)
{
   if (current.term() < previous.term()) {
     return Status::Corruption(Substitute("New operation's term is not >= than the previous
"
         "op's term. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
@@ -551,6 +530,18 @@ Status ReplicaState::CheckOpInSequence(const OpId& previous, const
OpId& current
   return Status::OK();
 }
 
+int64_t PendingRounds::GetCommittedIndex() const {
+  return last_committed_op_id_.index();
+}
+
+int64_t PendingRounds::GetTermWithLastCommittedOp() const {
+  return last_committed_op_id_.term();
+}
+
+int PendingRounds::GetNumPendingTxns() const {
+  return pending_txns_.size();
+}
+
 }  // namespace consensus
 }  // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/27992c12/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h
index dc875b5..db8af70 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -28,8 +28,8 @@
 #include "kudu/consensus/consensus.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus_meta.h"
-#include "kudu/consensus/consensus_queue.h"
 #include "kudu/consensus/log_util.h"
+#include "kudu/consensus/opid_util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/status.h"
@@ -45,7 +45,7 @@ class Messenger;
 
 namespace consensus {
 
-// Class that coordinates access to the replica state (independently of Role).
+// Class that coordinates access to the persistent Raft state (independently of Role).
 // This has a 1-1 relationship with RaftConsensus and is essentially responsible for
 // keeping state and checking if state changes are viable.
 //
@@ -54,20 +54,9 @@ namespace consensus {
 // considered to be the pending configuration if it is non-null, otherwise the
 // committed configuration is the active configuration.
 //
-// When a replica becomes a leader of a configuration, it sets the pending configuration
to
-// a new configuration declaring itself as leader and sets its "active" role to LEADER.
-// It then starts up ConsensusPeers for each member of the pending configuration and
-// tries to push a new configuration to the peers. Once that configuration is
-// pushed to a majority of the cluster, it is considered committed and the
-// replica flushes that configuration to disk as the committed configuration.
-//
-// Each time an operation is to be performed on the replica the appropriate LockFor*()
-// method should be called. The LockFor*() methods check that the replica is in the
-// appropriate state to perform the requested operation and returns the lock or return
-// Status::IllegalState if that is not the case.
-//
-// All state reading/writing methods acquire the lock, unless suffixed by "Unlocked", in
-// which case a lock should be obtained prior to calling them.
+// TODO(todd): Currently this also performs some coarse-grained locking across the consensus
+// implementation in addition to providing a fairly thin wrapper around ConsensusMetadata.
+// The class should be renamed at the least and probably substantially simplified.
 class ReplicaState {
  public:
   enum State {
@@ -89,8 +78,6 @@ class ReplicaState {
 
   typedef std::unique_lock<simple_spinlock> UniqueLock;
 
-  typedef std::map<int64_t, scoped_refptr<ConsensusRound> > IndexToRoundMap;
-
   ReplicaState(ConsensusOptions options, std::string peer_uuid,
                std::unique_ptr<ConsensusMetadata> cmeta);
 
@@ -159,13 +146,6 @@ class ReplicaState {
   // configuration change pending.
   Status CheckNoConfigChangePendingUnlocked() const;
 
-  // Returns true if an operation is in this replica's log, namely:
-  // - If the op's index is lower than or equal to our committed index
-  // - If the op id matches an inflight op.
-  // If an operation with the same index is in our log but the terms
-  // are different 'term_mismatch' is set to true, it is false otherwise.
-  bool IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch);
-
   // Sets the given configuration as pending commit. Does not persist into the peers
   // metadata. In order to be persisted, SetCommittedConfigUnlocked() must be called.
   Status SetPendingConfigUnlocked(const RaftConfigPB& new_config) WARN_UNUSED_RESULT;
@@ -228,47 +208,6 @@ class ReplicaState {
 
   const ConsensusOptions& GetOptions() const;
 
-  // Aborts pending operations after, but not including 'index'. The OpId with 'index'
-  // will become our new last received id. If there are pending operations with indexes
-  // higher than 'index' those operations are aborted.
-  void AbortOpsAfterUnlocked(int64_t index);
-
-  // Returns the the ConsensusRound with the provided index, if there is any, or NULL
-  // if there isn't.
-  scoped_refptr<ConsensusRound> GetPendingOpByIndexOrNullUnlocked(int64_t index);
-
-  // Add 'round' to the set of rounds waiting to be committed.
-  Status AddPendingOperation(const scoped_refptr<ConsensusRound>& round);
-
-  // Advances the committed index.
-  // This is a no-op if the committed index has not changed.
-  Status AdvanceCommittedIndexUnlocked(int64_t committed_index);
-
-  // Set the committed op during startup. This should be done after
-  // appending any of the pending transactions, and will take care
-  // of triggering any that are now considered committed.
-  Status SetInitialCommittedOpIdUnlocked(const OpId& committed_op);
-
-  // Returns the watermark below which all operations are known to
-  // be committed according to consensus.
-  //
-  // This must be called under a lock.
-  int64_t GetCommittedIndexUnlocked() const;
-  int64_t GetTermWithLastCommittedOpUnlocked() const;
-
-  // Returns the id of the latest pending transaction (i.e. the one with the
-  // latest index). This must be called under the lock.
-  OpId GetLastPendingTransactionOpIdUnlocked() const;
-
-  // Used by replicas to cancel pending transactions. Pending transaction are those
-  // that have completed prepare/replicate but are waiting on the LEADER's commit
-  // to complete. This does not cancel transactions being applied.
-  Status CancelPendingTransactions();
-
-  // Returns the number of transactions that are currently in the pending state
-  // i.e. transactions for which Prepare() is done or under way.
-  int GetNumPendingTxnsUnlocked() const;
-
   std::string ToString() const;
   std::string ToStringUnlocked() const;
 
@@ -282,10 +221,6 @@ class ReplicaState {
   // information, but does not require the lock.
   std::string LogPrefixThreadSafe() const;
 
-  // Checks that 'current' correctly follows 'previous'. Specifically it checks
-  // that the term is the same or higher and that the index is sequential.
-  static Status CheckOpInSequence(const OpId& previous, const OpId& current);
-
   // Return the current state of this object.
   // The update_lock_ must be held.
   ReplicaState::State state() const;
@@ -305,19 +240,85 @@ class ReplicaState {
   // Consensus metadata persistence object.
   std::unique_ptr<ConsensusMetadata> cmeta_;
 
+  State state_;
+};
+
+// Tracks the pending consensus rounds being managed by a Raft replica (either leader
+// or follower).
+//
+// This class is not thread-safe.
+//
+// TODO(todd): this class inconsistently uses the term "round", "op", and "transaction".
+// We should consolidate to "round".
+class PendingRounds {
+ public:
+  explicit PendingRounds(std::string log_prefix);
+  ~PendingRounds();
+
+  // Set the committed op during startup. This should be done after
+  // appending any of the pending transactions, and will take care
+  // of triggering any that are now considered committed.
+  Status SetInitialCommittedOpId(const OpId& committed_op);
+
+  // Returns the the ConsensusRound with the provided index, if there is any, or NULL
+  // if there isn't.
+  scoped_refptr<ConsensusRound> GetPendingOpByIndexOrNull(int64_t index);
+
+  // Add 'round' to the set of rounds waiting to be committed.
+  Status AddPendingOperation(const scoped_refptr<ConsensusRound>& round);
+
+  // Advances the committed index.
+  // This is a no-op if the committed index has not changed.
+  Status AdvanceCommittedIndex(int64_t committed_index);
+
+  // Aborts pending operations after, but not including 'index'. The OpId with 'index'
+  // will become our new last received id. If there are pending operations with indexes
+  // higher than 'index' those operations are aborted.
+  void AbortOpsAfter(int64_t index);
+
+  // Returns true if an operation is in this replica's log, namely:
+  // - If the op's index is lower than or equal to our committed index
+  // - If the op id matches an inflight op.
+  // If an operation with the same index is in our log but the terms
+  // are different 'term_mismatch' is set to true, it is false otherwise.
+  bool IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch);
+
+  // Returns the id of the latest pending transaction (i.e. the one with the
+  // latest index). This must be called under the lock.
+  OpId GetLastPendingTransactionOpId() const;
+
+  // Used by replicas to cancel pending transactions. Pending transaction are those
+  // that have completed prepare/replicate but are waiting on the LEADER's commit
+  // to complete. This does not cancel transactions being applied.
+  Status CancelPendingTransactions();
+
+  // Returns the number of transactions that are currently in the pending state
+  // i.e. transactions for which Prepare() is done or under way.
+  int GetNumPendingTxns() const;
+
+  // Returns the watermark below which all operations are known to
+  // be committed according to consensus.
+  // TODO(todd): these should probably be removed in favor of using the queue.
+  int64_t GetCommittedIndex() const;
+  int64_t GetTermWithLastCommittedOp() const;
+
+  // Checks that 'current' correctly follows 'previous'. Specifically it checks
+  // that the term is the same or higher and that the index is sequential.
+  static Status CheckOpInSequence(const OpId& previous, const OpId& current);
+
+ private:
+  const std::string& LogPrefix() const { return log_prefix_; }
+
+  const std::string log_prefix_;
+
   // Index=>Round map that manages pending ops, i.e. operations for which we've
   // received a replicate message from the leader but have yet to be committed.
   // The key is the index of the replicate operation.
+  typedef std::map<int64_t, scoped_refptr<ConsensusRound> > IndexToRoundMap;
   IndexToRoundMap pending_txns_;
 
-  // The OpId of the Apply that was last triggered when the last message from the leader
-  // was received. Initialized to MinimumOpId().
-  //
-  // TODO: are there cases where this doesn't track the actual commit index,
-  // if there are no-ops?
+  // The OpId of the round that was last committed. Initialized to MinimumOpId().
   OpId last_committed_op_id_;
-
-  State state_;
 };
 
 }  // namespace consensus


Mime
View raw message