kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject [4/6] incubator-kudu git commit: Add a generic retriable rpc class
Date Tue, 24 May 2016 19:30:32 GMT
Add a generic retriable rpc class

This patch adds a new, generic, class for retriable Rpcs: RetriableRpc.
This class will handle retry logic, such as setting id/sequence numbers on
the rpc header, as well as number of attempts.

Derived classes of RetriableRpc no longer have to deal with logic regarding
replica selection, proxy initialization or retrying. They just have to implement:
Try() - Actually sends the rpc to a server.
AnalyzeResponse() - Buckets the response into a few, common, categories.
Finish() - Handle (final) success or failure.

This also refactors WriteRpc to use the new class.

Change-Id: Iaa58bdc5656a5d4d9172885a67363f74718a0c8e
Reviewed-on: http://gerrit.cloudera.org:8080/3064
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/f4a644c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/f4a644c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/f4a644c8

Branch: refs/heads/master
Commit: f4a644c8ad2f47183e43b022e8ce982e3242d9cb
Parents: 18739b2
Author: David Alves <david.alves@cloudera.com>
Authored: Wed May 11 17:15:10 2016 -0700
Committer: David Ribeiro Alves <david.alves@cloudera.com>
Committed: Tue May 24 18:11:29 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/batcher.cc   | 143 ++++++-----------------------
 src/kudu/rpc/retriable_rpc.h | 184 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 213 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f4a644c8/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index 7ea674c..c544b3e 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -44,6 +44,7 @@
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
+#include "kudu/rpc/retriable_rpc.h"
 #include "kudu/rpc/rpc.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/debug-util.h"
@@ -52,6 +53,7 @@
 using std::pair;
 using std::set;
 using std::shared_ptr;
+using std::unique_ptr;
 using std::unordered_map;
 using strings::Substitute;
 
@@ -59,6 +61,8 @@ namespace kudu {
 
 using rpc::ErrorStatusPB;
 using rpc::Messenger;
+using rpc::ResponseCallback;
+using rpc::RetriableRpc;
 using rpc::RetriableRpcStatus;
 using rpc::Rpc;
 using rpc::RpcController;
@@ -180,17 +184,16 @@ struct InFlightOp {
 // leader fails.
 //
 // Keeps a reference on the owning batcher while alive.
-class WriteRpc : public Rpc {
+class WriteRpc : public RetriableRpc<RemoteTabletServer, WriteRequestPB, WriteResponsePB>
{
  public:
   WriteRpc(const scoped_refptr<Batcher>& batcher,
-           const scoped_refptr<MetaCacheServerPicker>& server_picker,
+           const scoped_refptr<MetaCacheServerPicker>& replica_picker,
            vector<InFlightOp*> ops,
            const MonoTime& deadline,
            const shared_ptr<Messenger>& messenger,
            const string& tablet_id);
   virtual ~WriteRpc();
-  virtual void SendRpc() OVERRIDE;
-  virtual string ToString() const OVERRIDE;
+  string ToString() const override;
 
   const KuduTable* table() const {
     // All of the ops for a given tablet obviously correspond to the same table,
@@ -201,40 +204,16 @@ class WriteRpc : public Rpc {
   const WriteResponsePB& resp() const { return resp_; }
   const string& tablet_id() const { return tablet_id_; }
 
- private:
-  // Analyzes the response/current status and returns a RetriableRpcStatus containing
-  // an enum that can be used to choose the action to take.
-  RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status);
-
-  // Retries the rpc, if possible. Returns true if a retry occurred or
-  // false otherwise.
-  bool RetryIfNeeded(const RetriableRpcStatus& status,  RemoteTabletServer* replica);
-
-  // Called with an OK status when the leader has been found and the proxy initialized.
-  // Called with any other status if something failed with 'replica' set to 'nullptr'.
-  void ReplicaFoundCb(const Status& status, RemoteTabletServer* replica);
-
-  virtual void SendRpcCb(const Status& status) OVERRIDE;
+ protected:
+  void Try(RemoteTabletServer* replica, const ResponseCallback& callback) override;
+  RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status) override;
+  void Finish(const Status& status) override;
 
+ private:
   // Pointer back to the batcher. Processes the write response when it
   // completes, regardless of success or failure.
   scoped_refptr<Batcher> batcher_;
 
-  // The tablet that should receive this write.
-  scoped_refptr<MetaCacheServerPicker> server_picker_;
-
-  // Request body.
-  WriteRequestPB req_;
-
-  // Response body.
-  WriteResponsePB resp_;
-
-  // Keeps track of the tablet server the write was sent to.
-  // TODO Remove this and pass the used replica around. For now we need to keep this as
-  // the retrier calls the SendRpcCb directly and doesn't know the replica that was
-  // being written to.
-  RemoteTabletServer* current_ts_;
-
   // Operations which were batched into this RPC.
   // These operations are in kRequestSent state.
   vector<InFlightOp*> ops_;
@@ -244,15 +223,13 @@ class WriteRpc : public Rpc {
 };
 
 WriteRpc::WriteRpc(const scoped_refptr<Batcher>& batcher,
-                   const scoped_refptr<MetaCacheServerPicker>& server_picker,
+                   const scoped_refptr<MetaCacheServerPicker>& replica_picker,
                    vector<InFlightOp*> ops,
                    const MonoTime& deadline,
                    const shared_ptr<Messenger>& messenger,
                    const string& tablet_id)
-    : Rpc(deadline, messenger),
+    : RetriableRpc(replica_picker, deadline, messenger),
       batcher_(batcher),
-      server_picker_(server_picker),
-      current_ts_(nullptr),
       ops_(std::move(ops)),
       tablet_id_(tablet_id) {
   const Schema* schema = table()->schema().schema_;
@@ -311,35 +288,29 @@ WriteRpc::~WriteRpc() {
   STLDeleteElements(&ops_);
 }
 
-void WriteRpc::SendRpc() {
-  server_picker_->PickLeader(Bind(&WriteRpc::ReplicaFoundCb,
-                                  Unretained(this)),
-                              retrier().deadline());
-}
-
 string WriteRpc::ToString() const {
   return Substitute("Write(tablet: $0, num_ops: $1, num_attempts: $2)",
                     tablet_id_, ops_.size(), num_attempts());
 }
 
-void WriteRpc::ReplicaFoundCb(const Status& status, RemoteTabletServer* replica) {
-  RetriableRpcStatus result = AnalyzeResponse(status);
-  if (RetryIfNeeded(result, replica)) return;
-
-  if (result.result == RetriableRpcStatus::NON_RETRIABLE_ERROR) {
-    batcher_->ProcessWriteResponse(*this, status);
-    delete this;
-    return;
-  }
-
-  DCHECK_EQ(result.result, RetriableRpcStatus::OK);
-
+void WriteRpc::Try(RemoteTabletServer* replica, const ResponseCallback& callback) {
   VLOG(2) << "Tablet " << tablet_id_ << ": Writing batch to replica "
-          << replica->ToString();
-  current_ts_ = replica;
+      << replica->ToString();
   replica->proxy()->WriteAsync(req_, &resp_,
                                mutable_retrier()->mutable_controller(),
-                               boost::bind(&WriteRpc::SendRpcCb, this, Status::OK()));
+                               callback);
+}
+
+void WriteRpc::Finish(const Status& status) {
+  unique_ptr<WriteRpc> this_instance(this);
+  Status final_status = status;
+  if (!final_status.ok()) {
+    final_status = final_status.CloneAndPrepend(
+        Substitute("Failed to write batch of $0 ops to tablet $1 after $2 attempt(s)",
+                   ops_.size(), tablet_id_, num_attempts()));
+    LOG(WARNING) << final_status.ToString();
+  }
+  batcher_->ProcessWriteResponse(*this, final_status);
 }
 
 RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
@@ -404,62 +375,6 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status)
{
   return result;
 }
 
-bool WriteRpc::RetryIfNeeded(const RetriableRpcStatus& result, RemoteTabletServer* replica)
{
-  // Handle the cases where we retry.
-  switch (result.result) {
-    // For writes, always retry a SERVER_BUSY error on the same server.
-    case RetriableRpcStatus::SERVER_BUSY: {
-      break;
-    }
-    case RetriableRpcStatus::SERVER_NOT_ACCESSIBLE: {
-      VLOG(1) << "Failing " << ToString() << " to a new replica: " <<
result.status.ToString();
-      server_picker_->MarkServerFailed(replica, result.status);
-      break;
-    }
-    // The TabletServer was not part of the config serving the tablet.
-    // We mark our tablet cache as stale, forcing a master lookup on the next attempt.
-    // TODO: Don't backoff the first time we hit this error (see KUDU-1314).
-    case RetriableRpcStatus::RESOURCE_NOT_FOUND: {
-      server_picker_->MarkResourceNotFound(replica);
-      break;
-    }
-    // The TabletServer was not the leader of the quorum.
-    case RetriableRpcStatus::REPLICA_NOT_LEADER: {
-      server_picker_->MarkReplicaNotLeader(replica);
-      break;
-    }
-    // For the OK and NON_RETRIABLE_ERROR cases we can't/won't retry.
-    default:
-      return false;
-  }
-  resp_.Clear();
-  mutable_retrier()->DelayedRetry(this, result.status);
-  return true;
-}
-
-void WriteRpc::SendRpcCb(const Status& status) {
-  RetriableRpcStatus result = AnalyzeResponse(status);
-  if (RetryIfNeeded(result, current_ts_)) return;
-
-  // From here on out the rpc has either succeeded of suffered a non-retriable
-  // failure.
-  Status final_status = result.status;
-  if (!final_status.ok()) {
-    string current_ts_string;
-    if (current_ts_) {
-      current_ts_string = Substitute("on tablet server $0", current_ts_->ToString());
-    } else {
-      current_ts_string = "(no tablet server available)";
-    }
-    final_status = final_status.CloneAndPrepend(
-        Substitute("Failed to write batch of $0 ops to tablet $1 $2 after $3 attempt(s)",
-                   ops_.size(), tablet_id_, current_ts_string, num_attempts()));
-    LOG(WARNING) << final_status.ToString();
-  }
-  batcher_->ProcessWriteResponse(*this, final_status);
-  delete this;
-}
-
 Batcher::Batcher(KuduClient* client,
                  ErrorCollector* error_collector,
                  const sp::shared_ptr<KuduSession>& session,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f4a644c8/src/kudu/rpc/retriable_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/retriable_rpc.h b/src/kudu/rpc/retriable_rpc.h
new file mode 100644
index 0000000..526385a
--- /dev/null
+++ b/src/kudu/rpc/retriable_rpc.h
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+namespace rpc {
+
+// A base class for retriable RPCs that handles replica picking and retry logic.
+//
+// The 'Server' template parameter refers to the the type of the server that will be looked
up
+// and passed to the derived classes on Try(). For instance in the case of WriteRpc it's
+// RemoteTabletServer.
+//
+// TODO merge RpcRetrier into this class? Can't be done right now as the retrier is used
+// independently elsewhere, but likely possible when all replicated RPCs have a ReplicaPicker.
+//
+// TODO allow to target replicas other than the leader, if needed.
+//
+// TOOD once we have retry handling on all the RPCs merge this with rpc::Rpc.
+template <class Server, class RequestPB, class ResponsePB>
+class RetriableRpc : public Rpc {
+ public:
+  RetriableRpc(const scoped_refptr<ServerPicker<Server>>& server_picker,
+               const MonoTime& deadline,
+               const std::shared_ptr<Messenger>& messenger)
+   : Rpc(deadline, messenger),
+     server_picker_(server_picker) {}
+
+  virtual ~RetriableRpc() {}
+
+  // Performs server lookup/initialization.
+  // If/when the server is looked up and initialized successfully RetriableRpc will call
+  // Try() to actually send the request.
+  void SendRpc() override;
+
+ protected:
+  // Subclasses implement this method to actually try the RPC.
+  // The server been looked up and is ready to be used.
+  virtual void Try(Server* replica, const ResponseCallback& callback) = 0;
+
+  // Subclasses implement this method to analyze 'status', the controller status or
+  // the response and return a RetriableRpcStatus which will then be used
+  // to decide how to proceed (retry or give up).
+  virtual RetriableRpcStatus AnalyzeResponse(const Status& status) = 0;
+
+  // Subclasses implement this method to perform cleanup and/or final steps.
+  // After this is called the RPC will be no longer retried.
+  virtual void Finish(const Status& status) = 0;
+
+  // Request body.
+  RequestPB req_;
+
+  // Response body.
+  ResponsePB resp_;
+
+ private:
+  // Decides whether to retry the RPC, based on the result of AnalyzeResponse() and retries
+  // if that is the case.
+  // Returns true if the RPC was retried or false otherwise.
+  bool RetryIfNeeded(const RetriableRpcStatus& result, Server* server);
+
+  // Called when the replica has been looked up.
+  void ReplicaFoundCb(const Status& status, Server* server);
+
+  // Called when after the RPC was performed.
+  void SendRpcCb(const Status& status) override;
+
+  scoped_refptr<ServerPicker<Server>> server_picker_;
+  const MonoTime deadline_;
+  std::shared_ptr<Messenger> messenger_;
+
+  // Keeps track of the replica the RPCs were sent to.
+  // TODO Remove this and pass the used replica around. For now we need to keep this as
+  // the retrier calls the SendRpcCb directly and doesn't know the replica that was
+  // being written to.
+  Server* current_;
+};
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpc()  {
+  server_picker_->PickLeader(Bind(&RetriableRpc::ReplicaFoundCb,
+                                  Unretained(this)),
+                             retrier().deadline());
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded(const RetriableRpcStatus&
result,
+                                                                Server* server) {
+  // Handle the cases where we retry.
+  switch (result.result) {
+    // For writes, always retry a TOO_BUSY error on the same server.
+    case RetriableRpcStatus::SERVER_BUSY: {
+      break;
+    }
+    case RetriableRpcStatus::SERVER_NOT_ACCESSIBLE: {
+      VLOG(1) << "Failing " << ToString() << " to a new target: " <<
result.status.ToString();
+      server_picker_->MarkServerFailed(server, result.status);
+      break;
+    }
+      // The TabletServer was not part of the config serving the tablet.
+      // We mark our tablet cache as stale, forcing a master lookup on the next attempt.
+      // TODO: Don't backoff the first time we hit this error (see KUDU-1314).
+    case RetriableRpcStatus::RESOURCE_NOT_FOUND: {
+      server_picker_->MarkResourceNotFound(server);
+
+      break;
+    }
+      // The TabletServer was not the leader of the quorum.
+    case RetriableRpcStatus::REPLICA_NOT_LEADER: {
+      server_picker_->MarkReplicaNotLeader(server);
+      break;
+    }
+      // For the OK and NON_RETRIABLE_ERROR cases we can't/won't retry.
+    default:
+      return false;
+  }
+  resp_.Clear();
+  current_ = nullptr;
+  mutable_retrier()->DelayedRetry(this, result.status);
+  return true;
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::ReplicaFoundCb(const Status&
status,
+                                                                 Server* server) {
+  RetriableRpcStatus result = AnalyzeResponse(status);
+  if (RetryIfNeeded(result, server)) return;
+
+  if (result.result == RetriableRpcStatus::NON_RETRIABLE_ERROR) {
+    Finish(result.status);
+    return;
+  }
+
+  DCHECK_EQ(result.result, RetriableRpcStatus::OK);
+  current_ = server;
+  Try(server, boost::bind(&RetriableRpc::SendRpcCb, this, Status::OK()));
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpcCb(const Status& status)
{
+  RetriableRpcStatus result = AnalyzeResponse(status);
+  if (RetryIfNeeded(result, current_)) return;
+
+  // From here on out the RPC has either succeeded of suffered a non-retriable
+  // failure.
+  Status final_status = result.status;
+  if (!final_status.ok()) {
+    string error_string;
+    if (current_) {
+      error_string = strings::Substitute("Failed to write to server: $0", current_->ToString());
+    } else {
+      error_string = "Failed to write to server: (no server available)";
+    }
+    final_status = final_status.CloneAndPrepend(error_string);
+  }
+  Finish(final_status);
+}
+
+
+} // namespace rpc
+} // namespace kudu


Mime
View raw message