kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [2/3] kudu git commit: consensus: Get rid of LockFor*() methods
Date Thu, 01 Jun 2017 20:51:00 GMT
consensus: Get rid of LockFor*() methods

Simplify the locking logic by removing layers of abstraction.

Also add State_Name() helper for state-related error messages.

Change-Id: I6858752f4fbeb70b09eb4375c52e4aeaa1bb8e71
Reviewed-on: http://gerrit.cloudera.org:8080/7012
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3846861a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3846861a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3846861a

Branch: refs/heads/master
Commit: 3846861ab258a0ac0497893865875b2138964fe3
Parents: 30682fd
Author: Mike Percy <mpercy@apache.org>
Authored: Tue May 30 14:00:53 2017 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Thu Jun 1 20:44:45 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/raft_consensus.cc            | 298 +++++++++----------
 src/kudu/consensus/raft_consensus.h             |  68 ++---
 .../consensus/raft_consensus_quorum-test.cc     |   9 +-
 3 files changed, 169 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index c5370c7..6a2c39f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -278,8 +278,11 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
                                                         failure_detector_));
 
   {
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForStart(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    CHECK_EQ(kInitialized, state_) << LogPrefixUnlocked() << "Illegal state for
Start(): "
+                                   << State_Name(state_);
+
     ClearLeaderUnlocked();
 
     // Our last persisted term can be higher than the last persisted operation
@@ -310,8 +313,9 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
   }
 
   {
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForConfigChange(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    RETURN_NOT_OK(CheckRunningUnlocked());
 
     RETURN_NOT_OK(EnsureFailureDetectorEnabledUnlocked());
 
@@ -350,15 +354,19 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info)
{
 }
 
 bool RaftConsensus::IsRunning() const {
-  UniqueLock lock;
-  Status s = LockForRead(&lock);
-  if (PREDICT_FALSE(!s.ok())) return false;
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   return state_ == kRunning;
 }
 
 Status RaftConsensus::EmulateElection() {
-  UniqueLock lock;
-  RETURN_NOT_OK(LockForConfigChange(&lock));
+  TRACE_EVENT2("consensus", "RaftConsensus::EmulateElection",
+               "peer", peer_uuid_,
+               "tablet", options_.tablet_id);
+
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  RETURN_NOT_OK(CheckRunningUnlocked());
 
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election...";
 
@@ -404,8 +412,9 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason
reason) {
                "mode", mode_str);
   scoped_refptr<LeaderElection> election;
   {
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForConfigChange(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    RETURN_NOT_OK(CheckRunningUnlocked());
 
     RaftPeerPB::Role active_role = GetActiveRoleUnlocked();
     if (active_role == RaftPeerPB::LEADER) {
@@ -413,7 +422,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason
reason) {
       return Status::OK();
     }
     if (PREDICT_FALSE(active_role == RaftPeerPB::NON_PARTICIPANT)) {
-      SnoozeFailureDetectorUnlocked(); // Avoid excessive election noise while in this state.
+      RETURN_NOT_OK(SnoozeFailureDetectorUnlocked()); // Reduce election noise while in this
state.
       return Status::IllegalState("Not starting election: Node is currently "
                                   "a non-participant in the raft config",
                                   SecureShortDebugString(GetActiveConfigUnlocked()));
@@ -499,8 +508,9 @@ Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout)
{
 
 Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
   TRACE_EVENT0("consensus", "RaftConsensus::StepDown");
-  UniqueLock lock;
-  RETURN_NOT_OK(LockForConfigChange(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  RETURN_NOT_OK(CheckRunningUnlocked());
   if (GetActiveRoleUnlocked() != RaftPeerPB::LEADER) {
     resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
     StatusToPB(Status::IllegalState("Not currently leader"),
@@ -580,8 +590,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>&
round) {
 
   std::lock_guard<simple_spinlock> lock(update_lock_);
   {
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg()));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
     RETURN_NOT_OK(round->CheckBoundTerm(GetCurrentTermUnlocked()));
     RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
   }
@@ -591,8 +602,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>&
round) {
 }
 
 Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>&
round) {
-  UniqueLock lock;
-  RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg()));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
   round->BindToTerm(GetCurrentTermUnlocked());
   return Status::OK();
 }
@@ -655,12 +667,19 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
 }
 
 void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
-  UniqueLock lock;
-  Status s = LockForCommit(&lock);
-  if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << LogPrefixThreadSafe()
-        << "Unable to take state lock to update committed index: "
-        << s.ToString();
+  TRACE_EVENT2("consensus", "RaftConsensus::NotifyCommitIndex",
+               "tablet", options_.tablet_id,
+               "commit_index", commit_index);
+
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  // We will process commit notifications while shutting down because a replica
+  // which has initiated a Prepare() / Replicate() may eventually commit even if
+  // its state has changed after the initial Append() / Update().
+  if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
+    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: "
+                                      << "Replica not in running state: "
+                                      << State_Name(state_);
     return;
   }
 
@@ -672,11 +691,16 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
 }
 
 void RaftConsensus::NotifyTermChange(int64_t term) {
-  UniqueLock lock;
-  Status s = LockForConfigChange(&lock);
+  TRACE_EVENT2("consensus", "RaftConsensus::NotifyTermChange",
+               "tablet", options_.tablet_id,
+               "term", term);
+
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  Status s = CheckRunningUnlocked();
   if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << LogPrefixThreadSafe() << "Unable to lock consensus for term
change"
-                 << " when notified of new term " << term << ": " <<
s.ToString();
+    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to handle notification of new term
"
+                                      << "(" << term << "): " <<
s.ToString();
     return;
   }
   WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term.");
@@ -697,14 +721,8 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
 
   RaftConfigPB committed_config;
   {
-    UniqueLock lock;
-    Status s = LockForRead(&lock);
-    if (PREDICT_FALSE(!s.ok())) {
-      LOG(WARNING) << LogPrefixThreadSafe() << fail_msg
-                   << "Unable to lock consensus for read: " << s.ToString();
-      return;
-    }
-
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
     int64_t current_term = GetCurrentTermUnlocked();
     if (current_term != term) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower
failure in "
@@ -794,8 +812,8 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr&
msg
 }
 
 Status RaftConsensus::IsSingleVoterConfig(bool* single_voter) const {
-  UniqueLock lock;
-  RETURN_NOT_OK(LockForRead(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   const RaftConfigPB& config = GetCommittedConfigUnlocked();
   const string& uuid = peer_uuid_;
   if (CountVoters(config) == 1 && IsRaftConfigVoter(uuid, config)) {
@@ -1142,8 +1160,12 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
   LeaderRequest deduped_req;
 
   {
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForUpdate(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    RETURN_NOT_OK(CheckRunningUnlocked());
+    if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) {
+      LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of
the config";
+    }
 
     deduped_req.leader_uuid = request->caller_uuid();
 
@@ -1333,9 +1355,9 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       // If just waiting for our log append to finish lets snooze the timer.
       // We don't want to fire leader election because we're waiting on our own log.
       if (s.IsTimedOut()) {
-        UniqueLock lock;
-        RETURN_NOT_OK(LockForRead(&lock));
-        SnoozeFailureDetectorUnlocked();
+        ThreadRestrictions::AssertWaitAllowed();
+        LockGuard l(lock_);
+        RETURN_NOT_OK(SnoozeFailureDetectorUnlocked());
       }
     } while (s.IsTimedOut());
     RETURN_NOT_OK(s);
@@ -1393,14 +1415,16 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
     // timeouts, just vote a quick NO.
     //
     // We still need to take the state lock in order to respond with term info, etc.
-    UniqueLock state_guard;
-    RETURN_NOT_OK(LockForConfigChange(&state_guard));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    RETURN_NOT_OK(CheckRunningUnlocked());
     return RequestVoteRespondIsBusy(request, response);
   }
 
   // Acquire the replica state lock so we can read / modify the consensus state.
-  UniqueLock state_guard;
-  RETURN_NOT_OK(LockForConfigChange(&state_guard));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  RETURN_NOT_OK(CheckRunningUnlocked());
 
   // If the node is not in the configuration, allow the vote (this is required by Raft)
   // but log an informational message anyway.
@@ -1480,6 +1504,10 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
 Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
                                    const StatusCallback& client_cb,
                                    boost::optional<TabletServerErrorPB::Code>* error_code)
{
+  TRACE_EVENT2("consensus", "RaftConsensus::ChangeConfig",
+               "peer", peer_uuid_,
+               "tablet", options_.tablet_id);
+
   if (PREDICT_FALSE(!req.has_type())) {
     return Status::InvalidArgument("Must specify 'type' argument to ChangeConfig()",
                                    SecureShortDebugString(req));
@@ -1492,8 +1520,9 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB&
req,
   ChangeConfigType type = req.type();
   const RaftPeerPB& server = req.server();
   {
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForConfigChange(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    RETURN_NOT_OK(CheckRunningUnlocked());
     RETURN_NOT_OK(CheckActiveLeaderUnlocked());
     RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked());
 
@@ -1599,8 +1628,8 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB&
req,
   {
     // Take the snapshot of the replica state and queue state so that
     // we can stick them in the consensus update request later.
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForRead(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
     current_term = GetCurrentTermUnlocked();
     committed_config = GetCommittedConfigUnlocked();
     if (IsConfigChangePendingUnlocked()) {
@@ -1719,6 +1748,10 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB&
req,
 }
 
 void RaftConsensus::Shutdown() {
+  TRACE_EVENT2("consensus", "RaftConsensus::Shutdown",
+               "peer", peer_uuid_,
+               "tablet", options_.tablet_id);
+
   // Avoid taking locks if already shut down so we don't violate
   // ThreadRestrictions assertions in the case where the RaftConsensus
   // destructor runs on the reactor thread due to an election callback being
@@ -1726,9 +1759,11 @@ void RaftConsensus::Shutdown() {
   if (shutdown_.Load(kMemOrderAcquire)) return;
 
   {
-    UniqueLock lock;
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
     // Transition to kShuttingDown state.
-    CHECK_OK(LockForShutdown(&lock));
+    CHECK_NE(kShutDown, state_) << State_Name(state_); // We are protected here by
'shutdown_'.
+    state_ = kShuttingDown;
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
   }
 
@@ -1739,10 +1774,10 @@ void RaftConsensus::Shutdown() {
   queue_->Close();
 
   {
-    UniqueLock lock;
-    CHECK_OK(LockForShutdown(&lock));
-    CHECK_EQ(kShuttingDown, state_);
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
     CHECK_OK(pending_.CancelPendingTransactions());
+    CHECK_EQ(kShuttingDown, state_) << State_Name(state_);
     state_ = kShutDown;
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!";
   }
@@ -1779,8 +1814,9 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr&
msg
 }
 
 Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
-  UniqueLock lock;
-  CHECK_OK(LockForConfigChange(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  CHECK_OK(CheckRunningUnlocked());
   return HandleTermAdvanceUnlocked(new_term);
 }
 
@@ -1914,11 +1950,27 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB*
request
 }
 
 RaftPeerPB::Role RaftConsensus::role() const {
-  UniqueLock lock;
-  CHECK_OK(LockForRead(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   return GetActiveRoleUnlocked();
 }
 
+const char* RaftConsensus::State_Name(State state) {
+  switch (state) {
+    case kInitialized:
+      return "Initialized";
+    case kRunning:
+      return "Running";
+    case kShuttingDown:
+      return "Shutting down";
+    case kShutDown:
+      return "Shut down";
+    default:
+      LOG(DFATAL) << "Unknown State value: " << state;
+      return "Unknown";
+  }
+}
+
 void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
   DCHECK(lock_.is_locked());
   failed_elections_since_stable_leader_ = 0;
@@ -1975,14 +2027,14 @@ const string& RaftConsensus::tablet_id() const {
 }
 
 ConsensusStatePB RaftConsensus::ConsensusState() const {
-  UniqueLock lock;
-  CHECK_OK(LockForRead(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   return ConsensusStateUnlocked();
 }
 
 RaftConfigPB RaftConsensus::CommittedConfig() const {
-  UniqueLock lock;
-  CHECK_OK(LockForRead(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   return GetCommittedConfigUnlocked();
 }
 
@@ -1997,8 +2049,8 @@ void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
   // Dump the queues on a leader.
   RaftPeerPB::Role role;
   {
-    UniqueLock lock;
-    CHECK_OK(LockForRead(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
     role = GetActiveRoleUnlocked();
   }
   if (role == RaftPeerPB::LEADER) {
@@ -2026,8 +2078,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const
ElectionResu
 
   // Snooze to avoid the election timer firing again as much as possible.
   {
-    UniqueLock lock;
-    CHECK_OK(LockForRead(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
     // We need to snooze when we win and when we lose:
     // - When we win because we're about to disable the timer and become leader.
     // - When we lose or otherwise we can fall into a cycle, where everyone keeps
@@ -2064,12 +2116,13 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const
ElectionResu
   }
 
   // The vote was granted, become leader.
-  UniqueLock lock;
-  Status s = LockForConfigChange(&lock);
+  ThreadRestrictions::AssertWaitAllowed();
+  UniqueLock lock(lock_);
+  Status s = CheckRunningUnlocked();
   if (PREDICT_FALSE(!s.ok())) {
-    LOG_WITH_PREFIX(INFO) << "Received " << election_type << " callback
for term "
-                          << election_term << " while not running: "
-                          << s.ToString();
+    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received " << election_type << "
callback for term "
+                                   << election_term << " while not running: "
+                                   << s.ToString();
     return;
   }
 
@@ -2137,8 +2190,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const
ElectionResu
 }
 
 Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
-  UniqueLock lock;
-  RETURN_NOT_OK(LockForRead(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   if (type == RECEIVED_OPID) {
     *DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog();
   } else if (type == COMMITTED_OPID) {
@@ -2192,13 +2245,13 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound*
round,
   // the client callback.
 
   if (!status.ok()) {
-    LOG(INFO) << LogPrefixThreadSafe() << op_type_str << " replication
failed: "
-              << status.ToString();
+    LOG_WITH_PREFIX_UNLOCKED(INFO) << op_type_str << " replication failed: "
+                                   << status.ToString();
     client_cb.Run(status);
     return;
   }
-  VLOG(1) << LogPrefixThreadSafe() << "Committing " << op_type_str <<
" with op id "
-          << round->id();
+  VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with
op id "
+                               << round->id();
   gscoped_ptr<CommitMsg> commit_msg(new CommitMsg);
   commit_msg->set_op_type(round->replicate_msg()->op_type());
   *commit_msg->mutable_commited_op_id() = round->id();
@@ -2364,82 +2417,19 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
   return Status::OK();
 }
 
-Status RaftConsensus::LockForStart(UniqueLock* lock) const {
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(lock_);
-  CHECK_EQ(state_, kInitialized) << "Illegal state for Start()."
-      << " Replica is not in kInitialized state";
-  lock->swap(l);
-  return Status::OK();
-}
-
-Status RaftConsensus::LockForRead(UniqueLock* lock) const {
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(lock_);
-  lock->swap(l);
-  return Status::OK();
-}
-
-Status RaftConsensus::LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const
{
-  ThreadRestrictions::AssertWaitAllowed();
+Status RaftConsensus::CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const {
+  DCHECK(lock_.is_locked());
   DCHECK(!msg.has_id()) << "Should not have an ID yet: " << SecureShortDebugString(msg);
-  UniqueLock l(lock_);
-  if (PREDICT_FALSE(state_ != kRunning)) {
-    return Status::IllegalState("Replica not in running state");
-  }
-
-  RETURN_NOT_OK(CheckActiveLeaderUnlocked());
-  lock->swap(l);
-  return Status::OK();
-}
-
-Status RaftConsensus::LockForCommit(UniqueLock* lock) const {
-  TRACE_EVENT0("consensus", "RaftConsensus::LockForCommit");
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(lock_);
-  if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
-    return Status::IllegalState("Replica not in running state");
-  }
-  lock->swap(l);
-  return Status::OK();
-}
-
-Status RaftConsensus::LockForUpdate(UniqueLock* lock) const {
-  TRACE_EVENT0("consensus", "RaftConsensus::LockForUpdate");
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(lock_);
-  if (PREDICT_FALSE(state_ != kRunning)) {
-    return Status::IllegalState("Replica not in running state");
-  }
-  if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) {
-    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of
the config";
-  }
-  lock->swap(l);
-  return Status::OK();
+  RETURN_NOT_OK(CheckRunningUnlocked());
+  return CheckActiveLeaderUnlocked();
 }
 
-Status RaftConsensus::LockForConfigChange(UniqueLock* lock) const {
-  TRACE_EVENT0("consensus", "RaftConsensus::LockForConfigChange");
-
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(lock_);
-  // Can only change the config on running replicas.
+Status RaftConsensus::CheckRunningUnlocked() const {
+  DCHECK(lock_.is_locked());
   if (PREDICT_FALSE(state_ != kRunning)) {
-    return Status::IllegalState("Unable to lock ReplicaState for config change",
-                                Substitute("State = $0", state_));
+    return Status::IllegalState("RaftConsensus is not running",
+                                Substitute("State = $0", State_Name(state_)));
   }
-  lock->swap(l);
-  return Status::OK();
-}
-
-Status RaftConsensus::LockForShutdown(UniqueLock* lock) {
-  TRACE_EVENT0("consensus", "RaftConsensus::LockForShutdown");
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(lock_);
-  if (state_ != kShuttingDown && state_ != kShutDown) {
-    state_ = kShuttingDown;
-  }
-  lock->swap(l);
   return Status::OK();
 }
 
@@ -2608,8 +2598,8 @@ const ConsensusOptions& RaftConsensus::GetOptions() const {
 }
 
 string RaftConsensus::LogPrefix() const {
-  UniqueLock lock;
-  CHECK_OK(LockForRead(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   return LogPrefixUnlocked();
 }
 
@@ -2630,14 +2620,14 @@ string RaftConsensus::LogPrefixThreadSafe() const {
 
 string RaftConsensus::ToString() const {
   ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock lock(lock_);
+  LockGuard l(lock_);
   return ToStringUnlocked();
 }
 
 string RaftConsensus::ToStringUnlocked() const {
   DCHECK(lock_.is_locked());
   return Substitute("Replica: $0, State: $1, Role: $2",
-                    peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
+                    peer_uuid_, State_Name(state_), RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
 }
 
 ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index b1badde..b9348c2 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -35,9 +35,6 @@
 
 namespace kudu {
 
-typedef std::lock_guard<simple_spinlock> Lock;
-typedef gscoped_ptr<Lock> ScopedLock;
-
 class Counter;
 class FailureDetector;
 class HostPort;
@@ -62,8 +59,6 @@ struct ElectionResult;
 class RaftConsensus : public Consensus,
                       public PeerMessageQueueObserver {
  public:
-  typedef std::unique_lock<simple_spinlock> UniqueLock;
-
   static scoped_refptr<RaftConsensus> Create(
     ConsensusOptions options,
     std::unique_ptr<ConsensusMetadata> cmeta,
@@ -181,6 +176,8 @@ class RaftConsensus : public Consensus,
   FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty);
   FRIEND_TEST(RaftConsensusQuorumTest, TestRequestVote);
 
+  // NOTE: When adding / changing values in this enum, add the corresponding
+  // values to State_Name().
   enum State {
     // State after the replica is built.
     kInitialized,
@@ -224,6 +221,12 @@ class RaftConsensus : public Consensus,
     std::string OpsRangeString() const;
   };
 
+  using LockGuard = std::lock_guard<simple_spinlock>;
+  using UniqueLock = std::unique_lock<simple_spinlock>;
+
+  // Returns string description for State enum value.
+  static const char* State_Name(State state);
+
   // Set the leader UUID of the configuration and mark the tablet config dirty for
   // reporting to the master.
   void SetLeaderUuidUnlocked(const std::string& uuid);
@@ -276,7 +279,8 @@ class RaftConsensus : public Consensus,
   // pending operations, we proactively abort those pending operations after and including
   // the preceding op in 'req' to avoid a pointless cache miss in the leader's log cache.
   Status EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req,
-                                                   ConsensusResponsePB* response);
+                                                   ConsensusResponsePB* response)
+         WARN_UNUSED_RESULT;
 
   // Check a request received from a leader, making sure:
   // - The request is in the right term
@@ -288,7 +292,7 @@ class RaftConsensus : public Consensus,
   // the messages to add to our state machine.
   Status CheckLeaderRequestUnlocked(const ConsensusRequestPB* request,
                                     ConsensusResponsePB* response,
-                                    LeaderRequest* deduped_req);
+                                    LeaderRequest* deduped_req) WARN_UNUSED_RESULT;
 
   // Abort any pending operations after the given op index,
   // and also truncate the LogCache accordingly.
@@ -382,13 +386,13 @@ class RaftConsensus : public Consensus,
   // When this is called a failure is guaranteed not to be detected
   // before 'FLAGS_leader_failure_max_missed_heartbeat_periods' *
   // 'FLAGS_raft_heartbeat_interval_ms' has elapsed.
-  Status SnoozeFailureDetectorUnlocked();
+  Status SnoozeFailureDetectorUnlocked() WARN_UNUSED_RESULT;
 
   // Like the above but adds 'additional_delta' to the default timeout
   // period. If allow_logging is set to ALLOW_LOGGING, then this method
   // will print a log message when called.
   Status SnoozeFailureDetectorUnlocked(const MonoDelta& additional_delta,
-                                       AllowLogging allow_logging);
+                                       AllowLogging allow_logging) WARN_UNUSED_RESULT;
 
   // Return the minimum election timeout. Due to backoff and random
   // jitter, election timeouts may be longer than this.
@@ -462,45 +466,17 @@ class RaftConsensus : public Consensus,
   // (see Diego Ongaro's thesis section 4.1).
   Status AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round);
 
-  // Locks a replica in preparation for StartUnlocked(). Makes
-  // sure the replica is in kInitialized state.
-  Status LockForStart(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
-  // Obtains the lock for a state read, does not check state.
-  Status LockForRead(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
-  // Locks a replica down until the critical section of an append completes,
-  // i.e. until the replicate message has been assigned an id and placed in
-  // the log queue.
-  // This also checks that the replica is in the appropriate
-  // state (role) to replicate the provided operation, that the operation
-  // contains a replicate message and is of the appropriate type, and returns
-  // Status::IllegalState if that is not the case.
-  Status LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const WARN_UNUSED_RESULT;
-
-  // Locks a replica down until the critical section of a commit completes.
-  // This succeeds for all states since a replica which has initiated
-  // a Prepare()/Replicate() must eventually commit even if it's state
-  // has changed after the initial Append()/Update().
-  Status LockForCommit(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
-  // Locks a replica down until an the critical section of an update completes.
-  // Further updates from the same or some other leader will be blocked until
-  // this completes. This also checks that the replica is in the appropriate
-  // state (role) to be updated and returns Status::IllegalState if that
-  // is not the case.
-  Status LockForUpdate(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
-  Status LockForConfigChange(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
-  // Changes the role to non-participant and returns a lock that can be
-  // used to make sure no state updates come in until Shutdown() is
-  // completed.
-  Status LockForShutdown(UniqueLock* lock) WARN_UNUSED_RESULT;
+  // Checks that the replica is in the appropriate state and role to replicate
+  // the provided operation and that the replicate message does not yet have an
+  // OpId assigned.
+  Status CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const WARN_UNUSED_RESULT;
+
+  // Return Status::IllegalState if 'state_' != kRunning, OK otherwise.
+  Status CheckRunningUnlocked() const WARN_UNUSED_RESULT;
 
   // Ensure the local peer is the active leader.
   // Returns OK if leader, IllegalState otherwise.
-  Status CheckActiveLeaderUnlocked() const;
+  Status CheckActiveLeaderUnlocked() const WARN_UNUSED_RESULT;
 
   // Return current consensus state summary.
   ConsensusStatePB ConsensusStateUnlocked() const;
@@ -515,7 +491,7 @@ class RaftConsensus : public Consensus,
   // Inverse of IsConfigChangePendingUnlocked(): returns OK if there is
   // currently *no* configuration change pending, and IllegalState is there *is* a
   // configuration change pending.
-  Status CheckNoConfigChangePendingUnlocked() const;
+  Status CheckNoConfigChangePendingUnlocked() const WARN_UNUSED_RESULT;
 
   // Sets the given configuration as pending commit. Does not persist into the peers
   // metadata. In order to be persisted, SetCommittedConfigUnlocked() must be called.

http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index bb1575e..a0933c8 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -660,8 +660,7 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind)
{
     scoped_refptr<RaftConsensus> follower0;
     CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
 
-    RaftConsensus::UniqueLock lock;
-    ASSERT_OK(follower0->LockForRead(&lock));
+    RaftConsensus::LockGuard l(follower0->lock_);
 
     // If the locked replica would stop consensus we would hang here
     // as we wait for operations to be replicated to a majority.
@@ -703,13 +702,11 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind)
{
     // and never letting them go.
     scoped_refptr<RaftConsensus> follower0;
     CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
-    RaftConsensus::UniqueLock lock0;
-    ASSERT_OK(follower0->LockForRead(&lock0));
+    RaftConsensus::LockGuard l_0(follower0->lock_);
 
     scoped_refptr<RaftConsensus> follower1;
     CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
-    RaftConsensus::UniqueLock lock1;
-    ASSERT_OK(follower1->LockForRead(&lock1));
+    RaftConsensus::LockGuard l_1(follower1->lock_);
 
     // Append a single message to the queue
     ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));


Mime
View raw message