Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DDC61200B31 for ; Tue, 24 May 2016 21:30:36 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DC58516098E; Tue, 24 May 2016 19:30:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ACE39160A36 for ; Tue, 24 May 2016 21:30:35 +0200 (CEST) Received: (qmail 22550 invoked by uid 500); 24 May 2016 19:30:34 -0000 Mailing-List: contact commits-help@kudu.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.incubator.apache.org Delivered-To: mailing list commits@kudu.incubator.apache.org Received: (qmail 22539 invoked by uid 99); 24 May 2016 19:30:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 May 2016 19:30:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id D5793C25FF for ; Tue, 24 May 2016 19:30:33 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id PRzmwfIgh4Mm for ; Tue, 24 May 2016 19:30:31 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id E83BF60E43 for ; Tue, 24 May 2016 19:30:29 +0000 (UTC) Received: (qmail 22395 invoked by uid 99); 24 May 2016 19:30:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 May 2016 19:30:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0DD8DE0459; Tue, 24 May 2016 19:30:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: danburkert@apache.org To: commits@kudu.incubator.apache.org Date: Tue, 24 May 2016 19:30:32 -0000 Message-Id: <43244fc6dd0a4006b5e5133ef692eed1@git.apache.org> In-Reply-To: <9c586dddace24f07b921c6081c38c6d2@git.apache.org> References: <9c586dddace24f07b921c6081c38c6d2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/6] incubator-kudu git commit: Add a generic retriable rpc class archived-at: Tue, 24 May 2016 19:30:37 -0000 Add a generic retriable rpc class This patch adds a new, generic, class for retriable Rpcs: RetriableRpc. This class will handle retry logic, such as setting id/sequence numbers on the rpc header, as well as number of attempts. Derived classes of RetriableRpc no longer have to deal with logic regarding replica selection, proxy initialization or retrying. They just have to implement: Try() - Actually sends the rpc to a server. AnalyzeResponse() - Buckets the response into a few, common, categories. Finish() - Handle (final) success or failure. This also refactors WriteRpc to use the new class. Change-Id: Iaa58bdc5656a5d4d9172885a67363f74718a0c8e Reviewed-on: http://gerrit.cloudera.org:8080/3064 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/f4a644c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/f4a644c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/f4a644c8 Branch: refs/heads/master Commit: f4a644c8ad2f47183e43b022e8ce982e3242d9cb Parents: 18739b2 Author: David Alves Authored: Wed May 11 17:15:10 2016 -0700 Committer: David Ribeiro Alves Committed: Tue May 24 18:11:29 2016 +0000 ---------------------------------------------------------------------- src/kudu/client/batcher.cc | 143 ++++++----------------------- src/kudu/rpc/retriable_rpc.h | 184 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 213 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f4a644c8/src/kudu/client/batcher.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc index 7ea674c..c544b3e 100644 --- a/src/kudu/client/batcher.cc +++ b/src/kudu/client/batcher.cc @@ -44,6 +44,7 @@ #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/rpc/messenger.h" +#include "kudu/rpc/retriable_rpc.h" #include "kudu/rpc/rpc.h" #include "kudu/tserver/tserver_service.proxy.h" #include "kudu/util/debug-util.h" @@ -52,6 +53,7 @@ using std::pair; using std::set; using std::shared_ptr; +using std::unique_ptr; using std::unordered_map; using strings::Substitute; @@ -59,6 +61,8 @@ namespace kudu { using rpc::ErrorStatusPB; using rpc::Messenger; +using rpc::ResponseCallback; +using rpc::RetriableRpc; using rpc::RetriableRpcStatus; using rpc::Rpc; using rpc::RpcController; @@ -180,17 +184,16 @@ struct InFlightOp { // leader fails. // // Keeps a reference on the owning batcher while alive. -class WriteRpc : public Rpc { +class WriteRpc : public RetriableRpc { public: WriteRpc(const scoped_refptr& batcher, - const scoped_refptr& server_picker, + const scoped_refptr& replica_picker, vector ops, const MonoTime& deadline, const shared_ptr& messenger, const string& tablet_id); virtual ~WriteRpc(); - virtual void SendRpc() OVERRIDE; - virtual string ToString() const OVERRIDE; + string ToString() const override; const KuduTable* table() const { // All of the ops for a given tablet obviously correspond to the same table, @@ -201,40 +204,16 @@ class WriteRpc : public Rpc { const WriteResponsePB& resp() const { return resp_; } const string& tablet_id() const { return tablet_id_; } - private: - // Analyzes the response/current status and returns a RetriableRpcStatus containing - // an enum that can be used to choose the action to take. - RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status); - - // Retries the rpc, if possible. Returns true if a retry occurred or - // false otherwise. - bool RetryIfNeeded(const RetriableRpcStatus& status, RemoteTabletServer* replica); - - // Called with an OK status when the leader has been found and the proxy initialized. - // Called with any other status if something failed with 'replica' set to 'nullptr'. - void ReplicaFoundCb(const Status& status, RemoteTabletServer* replica); - - virtual void SendRpcCb(const Status& status) OVERRIDE; + protected: + void Try(RemoteTabletServer* replica, const ResponseCallback& callback) override; + RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status) override; + void Finish(const Status& status) override; + private: // Pointer back to the batcher. Processes the write response when it // completes, regardless of success or failure. scoped_refptr batcher_; - // The tablet that should receive this write. - scoped_refptr server_picker_; - - // Request body. - WriteRequestPB req_; - - // Response body. - WriteResponsePB resp_; - - // Keeps track of the tablet server the write was sent to. - // TODO Remove this and pass the used replica around. For now we need to keep this as - // the retrier calls the SendRpcCb directly and doesn't know the replica that was - // being written to. - RemoteTabletServer* current_ts_; - // Operations which were batched into this RPC. // These operations are in kRequestSent state. vector ops_; @@ -244,15 +223,13 @@ class WriteRpc : public Rpc { }; WriteRpc::WriteRpc(const scoped_refptr& batcher, - const scoped_refptr& server_picker, + const scoped_refptr& replica_picker, vector ops, const MonoTime& deadline, const shared_ptr& messenger, const string& tablet_id) - : Rpc(deadline, messenger), + : RetriableRpc(replica_picker, deadline, messenger), batcher_(batcher), - server_picker_(server_picker), - current_ts_(nullptr), ops_(std::move(ops)), tablet_id_(tablet_id) { const Schema* schema = table()->schema().schema_; @@ -311,35 +288,29 @@ WriteRpc::~WriteRpc() { STLDeleteElements(&ops_); } -void WriteRpc::SendRpc() { - server_picker_->PickLeader(Bind(&WriteRpc::ReplicaFoundCb, - Unretained(this)), - retrier().deadline()); -} - string WriteRpc::ToString() const { return Substitute("Write(tablet: $0, num_ops: $1, num_attempts: $2)", tablet_id_, ops_.size(), num_attempts()); } -void WriteRpc::ReplicaFoundCb(const Status& status, RemoteTabletServer* replica) { - RetriableRpcStatus result = AnalyzeResponse(status); - if (RetryIfNeeded(result, replica)) return; - - if (result.result == RetriableRpcStatus::NON_RETRIABLE_ERROR) { - batcher_->ProcessWriteResponse(*this, status); - delete this; - return; - } - - DCHECK_EQ(result.result, RetriableRpcStatus::OK); - +void WriteRpc::Try(RemoteTabletServer* replica, const ResponseCallback& callback) { VLOG(2) << "Tablet " << tablet_id_ << ": Writing batch to replica " - << replica->ToString(); - current_ts_ = replica; + << replica->ToString(); replica->proxy()->WriteAsync(req_, &resp_, mutable_retrier()->mutable_controller(), - boost::bind(&WriteRpc::SendRpcCb, this, Status::OK())); + callback); +} + +void WriteRpc::Finish(const Status& status) { + unique_ptr this_instance(this); + Status final_status = status; + if (!final_status.ok()) { + final_status = final_status.CloneAndPrepend( + Substitute("Failed to write batch of $0 ops to tablet $1 after $2 attempt(s)", + ops_.size(), tablet_id_, num_attempts())); + LOG(WARNING) << final_status.ToString(); + } + batcher_->ProcessWriteResponse(*this, final_status); } RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) { @@ -404,62 +375,6 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) { return result; } -bool WriteRpc::RetryIfNeeded(const RetriableRpcStatus& result, RemoteTabletServer* replica) { - // Handle the cases where we retry. - switch (result.result) { - // For writes, always retry a SERVER_BUSY error on the same server. - case RetriableRpcStatus::SERVER_BUSY: { - break; - } - case RetriableRpcStatus::SERVER_NOT_ACCESSIBLE: { - VLOG(1) << "Failing " << ToString() << " to a new replica: " << result.status.ToString(); - server_picker_->MarkServerFailed(replica, result.status); - break; - } - // The TabletServer was not part of the config serving the tablet. - // We mark our tablet cache as stale, forcing a master lookup on the next attempt. - // TODO: Don't backoff the first time we hit this error (see KUDU-1314). - case RetriableRpcStatus::RESOURCE_NOT_FOUND: { - server_picker_->MarkResourceNotFound(replica); - break; - } - // The TabletServer was not the leader of the quorum. - case RetriableRpcStatus::REPLICA_NOT_LEADER: { - server_picker_->MarkReplicaNotLeader(replica); - break; - } - // For the OK and NON_RETRIABLE_ERROR cases we can't/won't retry. - default: - return false; - } - resp_.Clear(); - mutable_retrier()->DelayedRetry(this, result.status); - return true; -} - -void WriteRpc::SendRpcCb(const Status& status) { - RetriableRpcStatus result = AnalyzeResponse(status); - if (RetryIfNeeded(result, current_ts_)) return; - - // From here on out the rpc has either succeeded of suffered a non-retriable - // failure. - Status final_status = result.status; - if (!final_status.ok()) { - string current_ts_string; - if (current_ts_) { - current_ts_string = Substitute("on tablet server $0", current_ts_->ToString()); - } else { - current_ts_string = "(no tablet server available)"; - } - final_status = final_status.CloneAndPrepend( - Substitute("Failed to write batch of $0 ops to tablet $1 $2 after $3 attempt(s)", - ops_.size(), tablet_id_, current_ts_string, num_attempts())); - LOG(WARNING) << final_status.ToString(); - } - batcher_->ProcessWriteResponse(*this, final_status); - delete this; -} - Batcher::Batcher(KuduClient* client, ErrorCollector* error_collector, const sp::shared_ptr& session, http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f4a644c8/src/kudu/rpc/retriable_rpc.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/retriable_rpc.h b/src/kudu/rpc/retriable_rpc.h new file mode 100644 index 0000000..526385a --- /dev/null +++ b/src/kudu/rpc/retriable_rpc.h @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include + +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/rpc.h" +#include "kudu/rpc/messenger.h" +#include "kudu/util/monotime.h" + +namespace kudu { +namespace rpc { + +// A base class for retriable RPCs that handles replica picking and retry logic. +// +// The 'Server' template parameter refers to the the type of the server that will be looked up +// and passed to the derived classes on Try(). For instance in the case of WriteRpc it's +// RemoteTabletServer. +// +// TODO merge RpcRetrier into this class? Can't be done right now as the retrier is used +// independently elsewhere, but likely possible when all replicated RPCs have a ReplicaPicker. +// +// TODO allow to target replicas other than the leader, if needed. +// +// TOOD once we have retry handling on all the RPCs merge this with rpc::Rpc. +template +class RetriableRpc : public Rpc { + public: + RetriableRpc(const scoped_refptr>& server_picker, + const MonoTime& deadline, + const std::shared_ptr& messenger) + : Rpc(deadline, messenger), + server_picker_(server_picker) {} + + virtual ~RetriableRpc() {} + + // Performs server lookup/initialization. + // If/when the server is looked up and initialized successfully RetriableRpc will call + // Try() to actually send the request. + void SendRpc() override; + + protected: + // Subclasses implement this method to actually try the RPC. + // The server been looked up and is ready to be used. + virtual void Try(Server* replica, const ResponseCallback& callback) = 0; + + // Subclasses implement this method to analyze 'status', the controller status or + // the response and return a RetriableRpcStatus which will then be used + // to decide how to proceed (retry or give up). + virtual RetriableRpcStatus AnalyzeResponse(const Status& status) = 0; + + // Subclasses implement this method to perform cleanup and/or final steps. + // After this is called the RPC will be no longer retried. + virtual void Finish(const Status& status) = 0; + + // Request body. + RequestPB req_; + + // Response body. + ResponsePB resp_; + + private: + // Decides whether to retry the RPC, based on the result of AnalyzeResponse() and retries + // if that is the case. + // Returns true if the RPC was retried or false otherwise. + bool RetryIfNeeded(const RetriableRpcStatus& result, Server* server); + + // Called when the replica has been looked up. + void ReplicaFoundCb(const Status& status, Server* server); + + // Called when after the RPC was performed. + void SendRpcCb(const Status& status) override; + + scoped_refptr> server_picker_; + const MonoTime deadline_; + std::shared_ptr messenger_; + + // Keeps track of the replica the RPCs were sent to. + // TODO Remove this and pass the used replica around. For now we need to keep this as + // the retrier calls the SendRpcCb directly and doesn't know the replica that was + // being written to. + Server* current_; +}; + +template +void RetriableRpc::SendRpc() { + server_picker_->PickLeader(Bind(&RetriableRpc::ReplicaFoundCb, + Unretained(this)), + retrier().deadline()); +} + +template +bool RetriableRpc::RetryIfNeeded(const RetriableRpcStatus& result, + Server* server) { + // Handle the cases where we retry. + switch (result.result) { + // For writes, always retry a TOO_BUSY error on the same server. + case RetriableRpcStatus::SERVER_BUSY: { + break; + } + case RetriableRpcStatus::SERVER_NOT_ACCESSIBLE: { + VLOG(1) << "Failing " << ToString() << " to a new target: " << result.status.ToString(); + server_picker_->MarkServerFailed(server, result.status); + break; + } + // The TabletServer was not part of the config serving the tablet. + // We mark our tablet cache as stale, forcing a master lookup on the next attempt. + // TODO: Don't backoff the first time we hit this error (see KUDU-1314). + case RetriableRpcStatus::RESOURCE_NOT_FOUND: { + server_picker_->MarkResourceNotFound(server); + + break; + } + // The TabletServer was not the leader of the quorum. + case RetriableRpcStatus::REPLICA_NOT_LEADER: { + server_picker_->MarkReplicaNotLeader(server); + break; + } + // For the OK and NON_RETRIABLE_ERROR cases we can't/won't retry. + default: + return false; + } + resp_.Clear(); + current_ = nullptr; + mutable_retrier()->DelayedRetry(this, result.status); + return true; +} + +template +void RetriableRpc::ReplicaFoundCb(const Status& status, + Server* server) { + RetriableRpcStatus result = AnalyzeResponse(status); + if (RetryIfNeeded(result, server)) return; + + if (result.result == RetriableRpcStatus::NON_RETRIABLE_ERROR) { + Finish(result.status); + return; + } + + DCHECK_EQ(result.result, RetriableRpcStatus::OK); + current_ = server; + Try(server, boost::bind(&RetriableRpc::SendRpcCb, this, Status::OK())); +} + +template +void RetriableRpc::SendRpcCb(const Status& status) { + RetriableRpcStatus result = AnalyzeResponse(status); + if (RetryIfNeeded(result, current_)) return; + + // From here on out the RPC has either succeeded of suffered a non-retriable + // failure. + Status final_status = result.status; + if (!final_status.ok()) { + string error_string; + if (current_) { + error_string = strings::Substitute("Failed to write to server: $0", current_->ToString()); + } else { + error_string = "Failed to write to server: (no server available)"; + } + final_status = final_status.CloneAndPrepend(error_string); + } + Finish(final_status); +} + + +} // namespace rpc +} // namespace kudu