kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject kudu git commit: [consensus] add lock coverage for peer queue
Date Sat, 11 Mar 2017 01:14:18 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 80f951236 -> 4a33783ee


[consensus] add lock coverage for peer queue

A Peer or PeerQueue would lock without taking into account the
presence of a peer calling back after TabletCopy and updating its
members, which doesn't abide by external locks (like the queue lock).
This would result in races.

A PeerMessageQueue operation would lock with the assumption that its
access to a peer was uninterrupted by other threads. This assumption
is broken in cases where a PeerProxy object has a callback to access a
peer (e.g. proxy calls StartTabletCopy with a callback that processes
the response and updates the peer) from a separate thread (as is the
case with the RpcHeartBeater).

A Peer operation like SendNextRequest would lock under this assumption,
only to potentially be interrupted by the same callback. This callback
was not completely locked and would race with the send.

This patch adds lock coverage to these cases, both seen in
RaftConsensusITest.TestConfigChangeUnderLoad.

TSAN errors and descriptions can be found here:
https://github.com/andrwng/kudu/blob/consensus-locks/docs/race_diagnostics/race_descriptions.txt

Full TSAN logs can be found here:
https://github.com/andrwng/kudu/blob/consensus-locks/docs/race_diagnostics/race_type_1.txt
https://github.com/andrwng/kudu/blob/consensus-locks/docs/race_diagnostics/race_type_2.txt

Dist-test results before and after:
Before: Note that for these builds, status 1 is likely a data race.
http://dist-test.cloudera.org/job?job_id=awong.1489029805.26963
http://dist-test.cloudera.org/job?job_id=awong.1489086157.18460

After:
http://dist-test.cloudera.org/job?job_id=awong.1489096815.24680

Change-Id: I0e3f9603c471441d9b926cac3377fc6a5a2a1514
Reviewed-on: http://gerrit.cloudera.org:8080/6299
Reviewed-by: David Ribeiro Alves <dralves@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4a33783e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4a33783e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4a33783e

Branch: refs/heads/master
Commit: 4a33783ee07f88a3bfc08f97e3f6d15fe1b73700
Parents: 80f9512
Author: Andrew Wong <awong@cloudera.com>
Authored: Tue Mar 7 17:22:40 2017 -0800
Committer: David Ribeiro Alves <dralves@apache.org>
Committed: Sat Mar 11 00:23:19 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_peers.cc | 13 ++++---
 src/kudu/consensus/consensus_queue.cc | 54 +++++++++++++++++-------------
 src/kudu/consensus/consensus_queue.h  |  7 +++-
 3 files changed, 42 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4a33783e/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index d2acdb7..cabcd32 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -325,19 +325,18 @@ Status Peer::PrepareTabletCopyRequest() {
 
 void Peer::ProcessTabletCopyResponse() {
   // If the peer is already closed return.
-  {
-    std::unique_lock<simple_spinlock> lock(peer_lock_);
-    if (closed_) {
-      return;
-    }
-    CHECK(request_pending_);
-    request_pending_ = false;
+  std::unique_lock<simple_spinlock> lock(peer_lock_);
+  if (closed_) {
+    return;
   }
+  CHECK(request_pending_);
+  request_pending_ = false;
 
   if (controller_.status().ok() && tc_response_.has_error()) {
     // ALREADY_INPROGRESS is expected, so we do not log this error.
     if (tc_response_.error().code() ==
         TabletServerErrorPB::TabletServerErrorPB::ALREADY_INPROGRESS) {
+      lock.unlock();
       queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid());
     } else {
       LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to begin Tablet Copy on peer: "

http://git-wip-us.apache.org/repos/asf/kudu/blob/4a33783e/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index bf746f3..ad98875 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -339,17 +339,22 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
                                         ConsensusRequestPB* request,
                                         vector<ReplicateRefPtr>* msg_refs,
                                         bool* needs_tablet_copy) {
-  TrackedPeer* peer = nullptr;
+  // Maintain a thread-safe copy of necessary members.
   OpId preceding_id;
+  int num_voters;
+  int64_t current_term;
+  TrackedPeer peer;
+  MonoDelta unreachable_time;
   {
     std::lock_guard<simple_spinlock> lock(queue_lock_);
     DCHECK_EQ(queue_state_.state, kQueueOpen);
     DCHECK_NE(uuid, local_peer_pb_.permanent_uuid());
 
-    peer = FindPtrOrNull(peers_map_, uuid);
-    if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
+    TrackedPeer* peer_ptr = FindPtrOrNull(peers_map_, uuid);
+    if (PREDICT_FALSE(peer_ptr == nullptr || queue_state_.mode == NON_LEADER)) {
       return Status::NotFound("Peer not tracked or queue not in leader mode.");
     }
+    peer = *peer_ptr;
 
     // Clear the requests without deleting the entries, as they may be in use by other peers.
     request->mutable_ops()->ExtractSubrange(0, request->ops_size(), nullptr);
@@ -357,15 +362,16 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
     // This is initialized to the queue's last appended op but gets set to the id of the
     // log entry preceding the first one in 'messages' if messages are found for the peer.
     preceding_id = queue_state_.last_appended;
+    num_voters = CountVoters(*queue_state_.active_config);
+    current_term = queue_state_.current_term;
+
     request->set_committed_index(queue_state_.committed_index);
     request->set_all_replicated_index(queue_state_.all_replicated_index);
-    request->set_caller_term(queue_state_.current_term);
+    request->set_caller_term(current_term);
+    unreachable_time = MonoTime::Now() - peer.last_successful_communication_time;
   }
-
-  MonoDelta unreachable_time =
-      MonoTime::Now() - peer->last_successful_communication_time;
   if (unreachable_time.ToSeconds() > FLAGS_follower_unavailable_considered_failed_sec)
{
-    if (CountVoters(*queue_state_.active_config) > 2) {
+    if (num_voters > 2) {
       // We never drop from 2 to 1 automatically, at least for now. We may want
       // to revisit this later, we're just being cautious with this.
       string msg = Substitute("Leader has been unable to successfully communicate "
@@ -373,12 +379,11 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
                               uuid,
                               FLAGS_follower_unavailable_considered_failed_sec,
                               unreachable_time.ToString());
-      NotifyObserversOfFailedFollower(uuid, queue_state_.current_term, msg);
+      NotifyObserversOfFailedFollower(uuid, current_term, msg);
     }
   }
-
-  if (PREDICT_FALSE(peer->needs_tablet_copy)) {
-    KLOG_EVERY_N_SECS_THROTTLER(INFO, 3, peer->status_log_throttler, "tablet copy")
+  if (PREDICT_FALSE(peer.needs_tablet_copy)) {
+    KLOG_EVERY_N_SECS_THROTTLER(INFO, 3, peer.status_log_throttler, "tablet copy")
         << LogPrefixUnlocked() << "Peer " << uuid << " needs tablet
copy" << THROTTLE_MSG;
     *needs_tablet_copy = true;
     return Status::OK();
@@ -388,14 +393,14 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
   // If we've never communicated with the peer, we don't know what messages to
   // send, so we'll send a status-only request. Otherwise, we grab requests
   // from the log starting at the last_received point.
-  if (!peer->is_new) {
+  if (!peer.is_new) {
 
     // The batch of messages to send to the peer.
     vector<ReplicateRefPtr> messages;
     int max_batch_size = FLAGS_consensus_max_batch_size_bytes - request->ByteSize();
 
     // We try to get the follower's next_index from our log.
-    Status s = log_cache_.ReadOps(peer->next_index - 1,
+    Status s = log_cache_.ReadOps(peer.next_index - 1,
                                   max_batch_size,
                                   &messages,
                                   &preceding_id);
@@ -406,7 +411,7 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
         string msg = Substitute("The logs necessary to catch up peer $0 have been "
                                 "garbage collected. The follower will never be able "
                                 "to catch up ($1)", uuid, s.ToString());
-        NotifyObserversOfFailedFollower(uuid, queue_state_.current_term, msg);
+        NotifyObserversOfFailedFollower(uuid, current_term, msg);
         return s;
       // IsIncomplete() means that we tried to read beyond the head of the log
       // (in the future). See KUDU-1078.
@@ -415,12 +420,12 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
         LOG_WITH_PREFIX_UNLOCKED(ERROR) << "Error trying to read ahead of the log "
                                         << "while preparing peer request: "
                                         << s.ToString() << ". Destination peer:
"
-                                        << peer->ToString();
+                                        << peer.ToString();
         return s;
       }
       LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error reading the log while preparing peer
request: "
                                       << s.ToString() << ". Destination peer:
"
-                                      << peer->ToString();
+                                      << peer.ToString();
     }
 
     // We use AddAllocated rather than copy, because we pin the log cache at the
@@ -443,7 +448,7 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
   if (request->ops_size() > 0) {
     int64_t last_op_sent = request->ops(request->ops_size() - 1).id().index();
     if (last_op_sent < request->committed_index()) {
-      KLOG_EVERY_N_SECS_THROTTLER(INFO, 3, peer->status_log_throttler, "lagging")
+      KLOG_EVERY_N_SECS_THROTTLER(INFO, 3, peer.status_log_throttler, "lagging")
           << LogPrefixUnlocked() << "Peer " << uuid << " is lagging
by at least "
           << (request->committed_index() - last_op_sent)
           << " ops behind the committed index " << THROTTLE_MSG;
@@ -477,26 +482,27 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
 Status PeerMessageQueue::GetTabletCopyRequestForPeer(const string& uuid,
                                                           StartTabletCopyRequestPB* req)
{
   TrackedPeer* peer = nullptr;
+  int64_t current_term;
   {
     std::lock_guard<simple_spinlock> lock(queue_lock_);
     DCHECK_EQ(queue_state_.state, kQueueOpen);
     DCHECK_NE(uuid, local_peer_pb_.permanent_uuid());
     peer = FindPtrOrNull(peers_map_, uuid);
+    current_term = queue_state_.current_term;
     if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
       return Status::NotFound("Peer not tracked or queue not in leader mode.");
     }
-  }
-
-  if (PREDICT_FALSE(!peer->needs_tablet_copy)) {
-    return Status::IllegalState("Peer does not need to initiate Tablet Copy", uuid);
+    if (PREDICT_FALSE(!peer->needs_tablet_copy)) {
+      return Status::IllegalState("Peer does not need to initiate Tablet Copy", uuid);
+    }
+    peer->needs_tablet_copy = false;
   }
   req->Clear();
   req->set_dest_uuid(uuid);
   req->set_tablet_id(tablet_id_);
   req->set_copy_peer_uuid(local_peer_pb_.permanent_uuid());
   *req->mutable_copy_peer_addr() = local_peer_pb_.last_known_addr();
-  req->set_caller_term(queue_state_.current_term);
-  peer->needs_tablet_copy = false; // Now reset the flag.
+  req->set_caller_term(current_term);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4a33783e/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 3d25553..21c1b42 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -80,6 +80,11 @@ class PeerMessageQueue {
           needs_tablet_copy(false),
           last_seen_term_(0) {}
 
+    TrackedPeer() = default;
+
+    // Copy a given TrackedPeer.
+    TrackedPeer& operator=(const TrackedPeer& tracked_peer) = default;
+
     // Check that the terms seen from a given peer only increase
     // monotonically.
     void CheckMonotonicTerms(int64_t term) {
@@ -90,7 +95,7 @@ class PeerMessageQueue {
     std::string ToString() const;
 
     // UUID of the peer.
-    const std::string uuid;
+    std::string uuid;
 
     // Whether this is a newly tracked peer.
     bool is_new;


Mime
View raw message