kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [1/3] incubator-kudu git commit: Refactor retry handling logic for writes
Date Fri, 13 May 2016 00:04:28 GMT
Repository: incubator-kudu
Updated Branches:
  refs/heads/master 2c0ac84e1 -> 92f237ba9


Refactor retry handling logic for writes

This consolidates the retry handling logic for the WriteRpc by adding
two new methods: WriteRpc::AnalyzeResponse() and WriteRpc::RetryIfNeeded().
These are analogous to a recent refactor of the scan code and the first
one analyzes the response splitting it into retryable/non-retryable
categories, whereas the second one actually takes action if the category
is retryable.

This is a precursor to other changes that will pull this logic out of
WriteRpc and into somewhere we can use for other rpcs that have the same
behavior (replicated/retryable ones).

This introduces no tests as there is no new functionality, just a
refactoring of the code.

Change-Id: I0e0d491f902191c88c58e3d627106cc1be1bb3cc
Reviewed-on: http://gerrit.cloudera.org:8080/2970
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: David Ribeiro Alves <david.alves@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/2c0f805f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/2c0f805f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/2c0f805f

Branch: refs/heads/master
Commit: 2c0f805f75d4aa02bf1dcf0faf169b4fa60019eb
Parents: 2c0ac84
Author: David Alves <david.alves@cloudera.com>
Authored: Tue May 3 13:19:29 2016 -0700
Committer: David Ribeiro Alves <david.alves@cloudera.com>
Committed: Thu May 12 21:49:44 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/batcher.cc | 158 +++++++++++++++++++++++++++-------------
 src/kudu/rpc/rpc.h         |  31 ++++++++
 2 files changed, 137 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2c0f805f/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index f418bad..bdebe37 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -59,6 +59,7 @@ namespace kudu {
 
 using rpc::ErrorStatusPB;
 using rpc::Messenger;
+using rpc::RetriableRpcStatus;
 using rpc::Rpc;
 using rpc::RpcController;
 using tserver::WriteRequestPB;
@@ -207,9 +208,13 @@ class WriteRpc : public Rpc {
   // Sends the RPC, provided there was no error.
   void InitTSProxyCb(const Status& status);
 
-  // Marks all replicas on current_ts_ as failed and retries the write on a
-  // new replica.
-  void FailToNewReplica(const Status& reason);
+  // Analyzes the response/current status and returns a RetriableRpcStatus containing
+  // an enum that can be used to choose the action to take.
+  RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status);
+
+  // Retries the rpc, if possible. Returns true if a retry occurred or
+  // false otherwise.
+  bool RetryIfNeeded(const RetriableRpcStatus& status);
 
   virtual void SendRpcCb(const Status& status) OVERRIDE;
 
@@ -416,7 +421,8 @@ void WriteRpc::LookupTabletCb(const Status& status) {
 void WriteRpc::InitTSProxyCb(const Status& status) {
   // Fail to a replica in the event of a DNS resolution failure.
   if (!status.ok()) {
-    FailToNewReplica(status);
+    RetriableRpcStatus result = AnalyzeResponse(status);
+    CHECK(RetryIfNeeded(result)) << "Should have retried on a DNS resolution failure.";
     return;
   }
 
@@ -427,76 +433,124 @@ void WriteRpc::InitTSProxyCb(const Status& status) {
                                    boost::bind(&WriteRpc::SendRpcCb, this, Status::OK()));
 }
 
-void WriteRpc::FailToNewReplica(const Status& reason) {
-  VLOG(1) << "Failing " << ToString() << " to a new replica: "
-          << reason.ToString();
-  bool found = tablet_->MarkReplicaFailed(current_ts_, reason);
-  DCHECK(found)
-      << "Tablet " << tablet_->tablet_id() << ": Unable to mark replica
" << current_ts_->ToString()
-      << " as failed. Replicas: " << tablet_->ReplicasAsString();
+RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
+  RetriableRpcStatus result;
+  result.status = rpc_cb_status;
 
-  mutable_retrier()->DelayedRetry(this, reason);
-}
+  // If we didn't fail on tablet lookup/proxy initialization, check if we failed actually
performing
+  // the write.
+  if (rpc_cb_status.ok()) {
+    result.status = mutable_retrier()->controller().status();
+  }
 
-void WriteRpc::SendRpcCb(const Status& status) {
-  // Prefer early failures over controller failures.
-  Status new_status = status;
-  if (new_status.ok() && mutable_retrier()->HandleResponse(this, &new_status))
{
-    return;
+  if (result.status.IsRemoteError()) {
+    const ErrorStatusPB* err = mutable_retrier()->controller().error_response();
+    if (err &&
+        err->has_code() &&
+        err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY) {
+      result.result = RetriableRpcStatus::SERVER_BUSY;
+      return result;
+    }
   }
 
-  // Failover to a replica in the event of any network failure.
+  // Failover to a replica in the event of any network failure or of a DNS resolution problem.
   //
   // TODO: This is probably too harsh; some network failures should be
   // retried on the current replica.
-  if (new_status.IsNetworkError()) {
-    FailToNewReplica(new_status);
-    return;
+  if (result.status.IsNetworkError()) {
+    result.result = RetriableRpcStatus::SERVER_NOT_ACCESSIBLE;
+    return result;
   }
 
   // Prefer controller failures over response failures.
-  if (new_status.ok() && resp_.has_error()) {
-    new_status = StatusFromPB(resp_.error().status());
+  if (result.status.ok() && resp_.has_error()) {
+    result.status = StatusFromPB(resp_.error().status());
   }
 
+  // If we get TABLET_NOT_FOUND, the replica we thought was leader has been deleted.
   if (resp_.has_error() && resp_.error().code() == tserver::TabletServerErrorPB::TABLET_NOT_FOUND)
{
-    // If we get TABLET_NOT_FOUND, the replica we thought was leader has been
-    // deleted. We mark our tablet cache as stale, forcing a master lookup on
-    // the next attempt.
-    tablet_->MarkStale();
+    result.result = RetriableRpcStatus::RESOURCE_NOT_FOUND;
+    return result;
+  }
+
+  // Alternatively, when we get a status code of IllegalState or Aborted, we
+  // assume this means that the replica we attempted to write to is not the
+  // current leader (maybe it got partitioned or slow and another node took
+  // over).
+  //
+  // TODO: This error handling block should really be rewritten to handle
+  // specific error codes exclusively instead of Status codes (this may
+  // require some server-side changes). For example, IllegalState is
+  // obviously way too broad an error category for this case.
+  if (result.status.IsIllegalState() || result.status.IsAborted()) {
+    result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
+    return result;
+  }
+
+  if (result.status.ok()) {
+    result.result = RetriableRpcStatus::OK;
+  } else {
+    result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
+  }
+  return result;
+}
+
+bool WriteRpc::RetryIfNeeded(const RetriableRpcStatus& result) {
+  // Handle the cases where we retry.
+  switch (result.result) {
+    // For writes, always retry a SERVER_BUSY error on the same server.
+    case RetriableRpcStatus::SERVER_BUSY: {
+      break;
+    }
+    case RetriableRpcStatus::SERVER_NOT_ACCESSIBLE: {
+      VLOG(1) << "Failing " << ToString() << " to a new replica: " <<
result.status.ToString();
+      bool found = tablet_->MarkReplicaFailed(current_ts_, result.status);
+      DCHECK(found) << Substitute("Tablet $0: Unable to mark replica $1 as failed.
Replicas: ",
+                                  tablet_->tablet_id(), current_ts_->ToString(),
+                                  tablet_->ReplicasAsString());
+      break;
+    }
+    // The TabletServer was not part of the config serving the tablet.
+    // We mark our tablet cache as stale, forcing a master lookup on the next attempt.
     // TODO: Don't backoff the first time we hit this error (see KUDU-1314).
-    mutable_retrier()->DelayedRetry(this, StatusFromPB(resp_.error().status()));
-    return;
-  } else if (new_status.IsIllegalState() || new_status.IsAborted()) {
-    // Alternatively, when we get a status code of IllegalState or Aborted, we
-    // assume this means that the replica we attempted to write to is not the
-    // current leader (maybe it got partitioned or slow and another node took
-    // over). We attempt to fail over to another replica in the config.
-    //
-    // TODO: This error handling block should really be rewritten to handle
-    // specific error codes exclusively instead of Status codes (this may
-    // require some server-side changes). For example, IllegalState is
-    // obviously way too broad an error category for this case.
-    followers_.insert(current_ts_);
-    mutable_retrier()->DelayedRetry(this, new_status);
-    return;
+    case RetriableRpcStatus::RESOURCE_NOT_FOUND: {
+      tablet_->MarkStale();
+      break;
+    }
+    // The TabletServer was not the leader of the quorum.
+    case RetriableRpcStatus::REPLICA_NOT_LEADER: {
+      followers_.insert(current_ts_);
+      break;
+    }
+    // For the OK and NON_RETRIABLE_ERROR cases we can't/won't retry.
+    default:
+      return false;
   }
+  resp_.Clear();
+  mutable_retrier()->DelayedRetry(this, result.status);
+  return true;
+}
+
+void WriteRpc::SendRpcCb(const Status& status) {
+  RetriableRpcStatus result = AnalyzeResponse(status);
+  if (RetryIfNeeded(result)) return;
 
-  if (!new_status.ok()) {
+  // From here on out the rpc has either succeeded of suffered a non-retriable
+  // failure.
+  Status final_status = result.status;
+  if (!final_status.ok()) {
     string current_ts_string;
     if (current_ts_) {
       current_ts_string = Substitute("on tablet server $0", current_ts_->ToString());
     } else {
       current_ts_string = "(no tablet server available)";
     }
-    new_status = new_status.CloneAndPrepend(
-        Substitute("Failed to write batch of $0 ops to tablet $1 "
-                   "$2 after $3 attempt(s)",
-                   ops_.size(), tablet_->tablet_id(),
-                   current_ts_string, num_attempts()));
-    LOG(WARNING) << new_status.ToString();
-  }
-  batcher_->ProcessWriteResponse(*this, new_status);
+    final_status = final_status.CloneAndPrepend(
+        Substitute("Failed to write batch of $0 ops to tablet $1 $2 after $3 attempt(s)",
+                   ops_.size(), tablet_->tablet_id(), current_ts_string, num_attempts()));
+    LOG(WARNING) << final_status.ToString();
+  }
+  batcher_->ProcessWriteResponse(*this, final_status);
   delete this;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2c0f805f/src/kudu/rpc/rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc.h b/src/kudu/rpc/rpc.h
index 8eb4456..c27abe6 100644
--- a/src/kudu/rpc/rpc.h
+++ b/src/kudu/rpc/rpc.h
@@ -32,6 +32,37 @@ namespace rpc {
 class Messenger;
 class Rpc;
 
+// Result status of a retriable Rpc.
+//
+// TODO Consider merging this with ScanRpcStatus.
+struct RetriableRpcStatus {
+  enum Result {
+    // There was no error, i.e. the Rpc was successful.
+    OK,
+
+    // The Rpc got an error and it's not retriable.
+    NON_RETRIABLE_ERROR,
+
+    // The server couldn't be reached, i.e. there was a network error while
+    // reaching the replica or a DNS resolution problem.
+    SERVER_NOT_ACCESSIBLE,
+
+    // The server is too busy to serve the request.
+    SERVER_BUSY,
+
+    // For rpc's that are meant only for the leader of a shared resource, when the server
+    // we're interacting with is not the leader.
+    REPLICA_NOT_LEADER,
+
+    // The server doesn't know the resource we're interacting with. For instance a TabletServer
+    // is not part of the config for the tablet we're trying to write to.
+    RESOURCE_NOT_FOUND
+  };
+
+  Result result;
+  Status status;
+};
+
 // Provides utilities for retrying failed RPCs.
 //
 // All RPCs should use HandleResponse() to retry certain generic errors.


Mime
View raw message