kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [1/4] kudu git commit: [consensus] fix message on unprepared dedup ops
Date Mon, 26 Nov 2018 21:31:14 GMT
Repository: kudu
Updated Branches:
  refs/heads/master f28e8bb81 -> 74aa53f13


[consensus] fix message on unprepared dedup ops

A minor clean-up on RaftConsensus::UpdateReplica() aiming to fix
the warning message on the unprepared operations from deduplicated
request.

Change-Id: Ib9871068743b27720f839797ba6aa6f23cf03a7a
Reviewed-on: http://gerrit.cloudera.org:8080/11982
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <awong@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b37f264f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b37f264f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b37f264f

Branch: refs/heads/master
Commit: b37f264fbb4f798ee9743ed87e145cd92a7fc63d
Parents: f28e8bb
Author: Alexey Serbin <alexey@apache.org>
Authored: Thu Nov 22 00:17:42 2018 -0800
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Mon Nov 26 14:23:26 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/raft_consensus.cc | 50 ++++++++++++++-----------------
 1 file changed, 22 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b37f264f/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index fc3fdca..d71f8e9 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -21,6 +21,7 @@
 #include <cmath>
 #include <cstdint>
 #include <functional>
+#include <iterator>
 #include <memory>
 #include <mutex>
 #include <ostream>
@@ -1391,7 +1392,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
 
   // The deduplicated request.
   LeaderRequest deduped_req;
-
+  auto& messages = deduped_req.messages;
   {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
@@ -1403,7 +1404,6 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     deduped_req.leader_uuid = request->caller_uuid();
 
     RETURN_NOT_OK(CheckLeaderRequestUnlocked(request, response, &deduped_req));
-
     if (response->status().has_error()) {
       // We had an error, like an invalid term, we still fill the response.
       FillConsensusResponseOKUnlocked(response);
@@ -1440,7 +1440,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     // 1. As many pending transactions as we can, except...
     // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639, and...
     // 3. ...the leader's committed index is always our upper bound.
-    int64_t early_apply_up_to = std::min<int64_t>({
+    const int64_t early_apply_up_to = std::min({
         pending_->GetLastPendingTransactionOpId().index(),
         deduped_req.preceding_opid->index(),
         request->committed_index()});
@@ -1456,13 +1456,9 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
 
     // 2 - Enqueue the prepares
 
-    TRACE("Triggering prepare for $0 ops", deduped_req.messages.size());
-
-    Status prepare_status;
-    auto iter = deduped_req.messages.begin();
-
-    if (PREDICT_TRUE(!deduped_req.messages.empty())) {
+    TRACE("Triggering prepare for $0 ops", messages.size());
 
+    if (PREDICT_TRUE(!messages.empty())) {
       // This request contains at least one message, and is likely to increase
       // our memory pressure.
       double capacity_pct;
@@ -1482,12 +1478,14 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       }
     }
 
-    while (iter != deduped_req.messages.end()) {
+    Status prepare_status;
+    auto iter = messages.begin();
+    while (iter != messages.end()) {
       prepare_status = StartFollowerTransactionUnlocked(*iter);
       if (PREDICT_FALSE(!prepare_status.ok())) {
         break;
       }
-      // TODO(dralves) Without leader leases this shouldn't be a allowed to fail.
+      // TODO(dralves) Without leader leases this shouldn't be allowed to fail.
       // Once we have that functionality we'll have to revisit this.
       CHECK_OK(time_manager_->MessageReceivedFromLeader(*(*iter)->get()));
       ++iter;
@@ -1497,22 +1495,18 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     // to perform cleanup, namely trimming deduped_req.messages to only contain the messages
     // that were actually prepared, and deleting the other ones since we've taken ownership
     // when we first deduped.
-    if (iter != deduped_req.messages.end()) {
-      bool need_to_warn = true;
-      while (iter != deduped_req.messages.end()) {
-        ReplicateRefPtr msg = (*iter);
-        iter = deduped_req.messages.erase(iter);
-        if (need_to_warn) {
-          need_to_warn = false;
-          LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Could not prepare transaction for op
"
-              << msg->get()->id() << " and following " << deduped_req.messages.size()
<<
-              " ops. Status for this op: " << prepare_status.ToString();
-        }
-      }
+    if (iter != messages.end()) {
+      LOG_WITH_PREFIX_UNLOCKED(WARNING) << Substitute(
+          "Could not prepare transaction for op '$0' and following $1 ops. "
+          "Status for this op: $2",
+          (*iter)->get()->id().ShortDebugString(),
+          std::distance(iter, messages.end()) - 1,
+          prepare_status.ToString());
+      iter = messages.erase(iter, messages.end());
 
       // If this is empty, it means we couldn't prepare a single de-duped message. There
is nothing
       // else we can do. The leader will detect this and retry later.
-      if (deduped_req.messages.empty()) {
+      if (messages.empty()) {
         string msg = Substitute("Rejecting Update request from peer $0 for term $1. "
                                 "Could not prepare a single transaction due to: $2",
                                 request->caller_uuid(),
@@ -1538,14 +1532,14 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     // 3 - Enqueue the writes.
     // Now that we've triggered the prepares enqueue the operations to be written
     // to the WAL.
-    if (PREDICT_TRUE(!deduped_req.messages.empty())) {
-      last_from_leader = deduped_req.messages.back()->get()->id();
+    if (PREDICT_TRUE(!messages.empty())) {
+      last_from_leader = messages.back()->get()->id();
       // Trigger the log append asap, if fsync() is on this might take a while
       // and we can't reply until this is done.
       //
       // Since we've prepared, we need to be able to append (or we risk trying to apply
       // later something that wasn't logged). We crash if we can't.
-      CHECK_OK(queue_->AppendOperations(deduped_req.messages, sync_status_cb));
+      CHECK_OK(queue_->AppendOperations(messages, sync_status_cb));
     } else {
       last_from_leader = *deduped_req.preceding_opid;
     }
@@ -1585,7 +1579,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
   // We'll re-acquire it before we update the state again.
 
   // Update the last replicated op id
-  if (!deduped_req.messages.empty()) {
+  if (!messages.empty()) {
 
     // 5 - We wait for the writes to be durable.
 


Mime
View raw message