Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 00E15200CE0 for ; Thu, 27 Jul 2017 06:04:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F384016A199; Thu, 27 Jul 2017 04:04:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ED6F116A196 for ; Thu, 27 Jul 2017 06:04:18 +0200 (CEST) Received: (qmail 60448 invoked by uid 500); 27 Jul 2017 04:04:18 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 60436 invoked by uid 99); 27 Jul 2017 04:04:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Jul 2017 04:04:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5870CE9624; Thu, 27 Jul 2017 04:04:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: todd@apache.org To: commits@kudu.apache.org Date: Thu, 27 Jul 2017 04:04:17 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] kudu git commit: KUDU-1865: Avoid heap allocation for payload slices archived-at: Thu, 27 Jul 2017 04:04:20 -0000 Repository: kudu Updated Branches: refs/heads/master 9285f2b44 -> 0ec793e32 KUDU-1865: Avoid heap allocation for payload slices As shown in KUDU-1865, the heap allocation for the temporary vector for the slices for holding the serialized payload is introducing measurable overhead under heavy load. This change replaces the heap allocation with a stack allocation of an array of size TransferLimits::kMaxPayloadSlices. With this change, we saw 10%~15% improvement under heavy workload. Change-Id: I4470d34ba48db5edaeb66d9e739e0c8942004d86 Reviewed-on: http://gerrit.cloudera.org:8080/7471 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0c6aa525 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0c6aa525 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0c6aa525 Branch: refs/heads/master Commit: 0c6aa525680d4f927c11b7dc85f9d79abf036b87 Parents: 9285f2b Author: Michael Ho Authored: Wed Jul 19 20:15:03 2017 -0700 Committer: Todd Lipcon Committed: Thu Jul 27 04:03:12 2017 +0000 ---------------------------------------------------------------------- src/kudu/rpc/connection.cc | 18 +++++++----------- src/kudu/rpc/connection.h | 4 ---- src/kudu/rpc/inbound_call.cc | 20 ++++++++++++-------- src/kudu/rpc/inbound_call.h | 5 +++-- src/kudu/rpc/outbound_call.cc | 17 ++++++++++++----- src/kudu/rpc/outbound_call.h | 3 ++- src/kudu/rpc/transfer.cc | 26 +++++++++++++------------- src/kudu/rpc/transfer.h | 14 ++++++++++---- 8 files changed, 59 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/connection.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc index fc46d67..8dfdbdf 100644 --- a/src/kudu/rpc/connection.cc +++ b/src/kudu/rpc/connection.cc @@ -310,13 +310,8 @@ void Connection::QueueOutboundCall(const shared_ptr &call) { call->set_call_id(call_id); // Serialize the actual bytes to be put on the wire. - slices_tmp_.clear(); - Status s = call->SerializeTo(&slices_tmp_); - if (PREDICT_FALSE(!s.ok())) { - call->SetFailed(s, negotiation_complete_ ? Phase::REMOTE_CALL - : Phase::CONNECTION_NEGOTIATION); - return; - } + TransferPayload tmp_slices; + size_t n_slices = call->SerializeTo(&tmp_slices); call->SetQueued(); @@ -371,7 +366,7 @@ void Connection::QueueOutboundCall(const shared_ptr &call) { TransferCallbacks *cb = new CallTransferCallbacks(call); awaiting_response_[call_id] = car.release(); QueueOutbound(gscoped_ptr( - OutboundTransfer::CreateForCallRequest(call_id, slices_tmp_, cb))); + OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, n_slices, cb))); } // Callbacks for sending an RPC call response from the server. @@ -442,14 +437,15 @@ void Connection::QueueResponseForCall(gscoped_ptr call) { // eventually runs in the reactor thread will take care of calling // ResponseTransferCallbacks::NotifyTransferAborted. - std::vector slices; - call->SerializeResponseTo(&slices); + TransferPayload tmp_slices; + size_t n_slices = call->SerializeResponseTo(&tmp_slices); TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this); // After the response is sent, can delete the InboundCall object. // We set a dummy call ID and required feature set, since these are not needed // when sending responses. - gscoped_ptr t(OutboundTransfer::CreateForCallResponse(slices, cb)); + gscoped_ptr t( + OutboundTransfer::CreateForCallResponse(tmp_slices, n_slices, cb)); QueueTransferTask *task = new QueueTransferTask(std::move(t), this); reactor_thread_->reactor()->ScheduleReactorTask(task); http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/connection.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h index 816c43c..c6cae38 100644 --- a/src/kudu/rpc/connection.h +++ b/src/kudu/rpc/connection.h @@ -320,10 +320,6 @@ class Connection : public RefCountedThreadSafe { // Starts as Status::OK, gets set to a shutdown status upon Shutdown(). Status shutdown_status_; - // Temporary vector used when serializing - avoids an allocation - // when serializing calls. - std::vector slices_tmp_; - // RPC features supported by the remote end of the connection. std::set remote_features_; http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/inbound_call.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc index aba9977..d1c27b7 100644 --- a/src/kudu/rpc/inbound_call.cc +++ b/src/kudu/rpc/inbound_call.cc @@ -175,16 +175,20 @@ void InboundCall::SerializeResponseBuffer(const MessageLite& response, &response_hdr_buf_); } -void InboundCall::SerializeResponseTo(vector* slices) const { +size_t InboundCall::SerializeResponseTo(TransferPayload* slices) const { TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo"); - CHECK_GT(response_hdr_buf_.size(), 0); - CHECK_GT(response_msg_buf_.size(), 0); - slices->reserve(slices->size() + 2 + outbound_sidecars_.size()); - slices->push_back(Slice(response_hdr_buf_)); - slices->push_back(Slice(response_msg_buf_)); - for (const unique_ptr& car : outbound_sidecars_) { - slices->push_back(car->AsSlice()); + DCHECK_GT(response_hdr_buf_.size(), 0); + DCHECK_GT(response_msg_buf_.size(), 0); + size_t n_slices = 2 + outbound_sidecars_.size(); + DCHECK_LE(n_slices, slices->size()); + auto slice_iter = slices->begin(); + *slice_iter++ = Slice(response_hdr_buf_); + *slice_iter++ = Slice(response_msg_buf_); + for (auto& sidecar : outbound_sidecars_) { + *slice_iter++ = sidecar->AsSlice(); } + DCHECK_EQ(slice_iter - slices->begin(), n_slices); + return n_slices; } Status InboundCall::AddOutboundSidecar(unique_ptr car, int* idx) { http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/inbound_call.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h index 6bed18f..84e6745 100644 --- a/src/kudu/rpc/inbound_call.h +++ b/src/kudu/rpc/inbound_call.h @@ -119,9 +119,10 @@ class InboundCall { const google::protobuf::MessageLite& app_error_pb, ErrorStatusPB* err); - // Serialize the response packet for the finished call. + // Serialize the response packet for the finished call into 'slices'. // The resulting slices refer to memory in this object. - void SerializeResponseTo(std::vector* slices) const; + // Returns the number of slices in the serialized response. + size_t SerializeResponseTo(TransferPayload* slices) const; // See RpcContext::AddRpcSidecar() Status AddOutboundSidecar(std::unique_ptr car, int* idx); http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/outbound_call.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc index af03f1c..a238568 100644 --- a/src/kudu/rpc/outbound_call.cc +++ b/src/kudu/rpc/outbound_call.cc @@ -92,7 +92,7 @@ OutboundCall::~OutboundCall() { DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " << StateName(state_); } -Status OutboundCall::SerializeTo(vector* slices) { +size_t OutboundCall::SerializeTo(TransferPayload* slices) { DCHECK_LT(0, request_buf_.size()) << "Must call SetRequestPayload() before SerializeTo()"; @@ -109,10 +109,16 @@ Status OutboundCall::SerializeTo(vector* slices) { serialization::SerializeHeader( header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_); - slices->push_back(Slice(header_buf_)); - slices->push_back(Slice(request_buf_)); - for (const unique_ptr& car : sidecars_) slices->push_back(car->AsSlice()); - return Status::OK(); + size_t n_slices = 2 + sidecars_.size(); + DCHECK_LE(n_slices, slices->size()); + auto slice_iter = slices->begin(); + *slice_iter++ = Slice(header_buf_); + *slice_iter++ = Slice(request_buf_); + for (auto& sidecar : sidecars_) { + *slice_iter++ = sidecar->AsSlice(); + } + DCHECK_EQ(slice_iter - slices->begin(), n_slices); + return n_slices; } void OutboundCall::SetRequestPayload(const Message& req, @@ -120,6 +126,7 @@ void OutboundCall::SetRequestPayload(const Message& req, DCHECK_EQ(-1, sidecar_byte_size_); sidecars_ = move(sidecars); + DCHECK_LE(sidecars_.size(), TransferLimits::kMaxSidecars); // Compute total size of sidecar payload so that extra space can be reserved as part of // the request body. http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/outbound_call.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h index ebed9b5..16ebc8a 100644 --- a/src/kudu/rpc/outbound_call.h +++ b/src/kudu/rpc/outbound_call.h @@ -154,7 +154,8 @@ class OutboundCall { // Serialize the call for the wire. Requires that SetRequestPayload() // is called first. This is called from the Reactor thread. - Status SerializeTo(std::vector* slices); + // Returns the number of slices in the serialized call. + size_t SerializeTo(TransferPayload* slices); // Callback after the call has been put on the outbound connection queue. void SetQueued(); http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/transfer.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc index d24e94d..d660869 100644 --- a/src/kudu/rpc/transfer.cc +++ b/src/kudu/rpc/transfer.cc @@ -135,32 +135,32 @@ string InboundTransfer::StatusAsString() const { return Substitute("$0/$1 bytes received", cur_offset_, total_length_); } -OutboundTransfer* OutboundTransfer::CreateForCallRequest( - int32_t call_id, - const std::vector &payload, - TransferCallbacks *callbacks) { - return new OutboundTransfer(call_id, payload, callbacks); +OutboundTransfer* OutboundTransfer::CreateForCallRequest(int32_t call_id, + const TransferPayload &payload, + size_t n_payload_slices, + TransferCallbacks *callbacks) { + return new OutboundTransfer(call_id, payload, n_payload_slices, callbacks); } -OutboundTransfer* OutboundTransfer::CreateForCallResponse(const std::vector &payload, +OutboundTransfer* OutboundTransfer::CreateForCallResponse(const TransferPayload &payload, + size_t n_payload_slices, TransferCallbacks *callbacks) { - return new OutboundTransfer(kInvalidCallId, payload, callbacks); + return new OutboundTransfer(kInvalidCallId, payload, n_payload_slices, callbacks); } - OutboundTransfer::OutboundTransfer(int32_t call_id, - const std::vector &payload, + const TransferPayload &payload, + size_t n_payload_slices, TransferCallbacks *callbacks) : cur_slice_idx_(0), cur_offset_in_slice_(0), callbacks_(callbacks), call_id_(call_id), aborted_(false) { - CHECK(!payload.empty()); - n_payload_slices_ = payload.size(); - CHECK_LE(n_payload_slices_, arraysize(payload_slices_)); - for (int i = 0; i < payload.size(); i++) { + n_payload_slices_ = n_payload_slices; + CHECK_LE(n_payload_slices_, payload_slices_.size()); + for (int i = 0; i < n_payload_slices; i++) { payload_slices_[i] = payload[i]; } } http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/transfer.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h index 671347a..2a2b726 100644 --- a/src/kudu/rpc/transfer.h +++ b/src/kudu/rpc/transfer.h @@ -18,6 +18,7 @@ #ifndef KUDU_RPC_TRANSFER_H #define KUDU_RPC_TRANSFER_H +#include #include #include #include @@ -56,6 +57,8 @@ class TransferLimits { DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits); }; +typedef std::array TransferPayload; + // This class is used internally by the RPC layer to represent an inbound // transfer in progress. // @@ -119,12 +122,14 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> { // Create an outbound transfer for a call request. static OutboundTransfer* CreateForCallRequest(int32_t call_id, - const std::vector &payload, + const TransferPayload &payload, + size_t n_payload_slices, TransferCallbacks *callbacks); // Create an outbound transfer for a call response. // See above for details. - static OutboundTransfer* CreateForCallResponse(const std::vector &payload, + static OutboundTransfer* CreateForCallResponse(const TransferPayload &payload, + size_t n_payload_slices, TransferCallbacks *callbacks); // Destruct the transfer. A transfer object should never be deallocated @@ -162,12 +167,13 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> { private: OutboundTransfer(int32_t call_id, - const std::vector &payload, + const TransferPayload& payload, + size_t n_payload_slices, TransferCallbacks *callbacks); // Slices to send. Uses an array here instead of a vector to avoid an expensive // vector construction (improved performance a couple percent). - Slice payload_slices_[TransferLimits::kMaxPayloadSlices]; + TransferPayload payload_slices_; size_t n_payload_slices_; // The current slice that is being sent.