kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [1/2] kudu git commit: KUDU-1865: Avoid heap allocation for payload slices
Date Thu, 27 Jul 2017 04:04:17 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 9285f2b44 -> 0ec793e32


KUDU-1865: Avoid heap allocation for payload slices

As shown in KUDU-1865, the heap allocation for the temporary
vector for the slices for holding the serialized payload is
introducing measurable overhead under heavy load. This change
replaces the heap allocation with a stack allocation of an
array of size TransferLimits::kMaxPayloadSlices. With this
change, we saw 10%~15% improvement under heavy workload.

Change-Id: I4470d34ba48db5edaeb66d9e739e0c8942004d86
Reviewed-on: http://gerrit.cloudera.org:8080/7471
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0c6aa525
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0c6aa525
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0c6aa525

Branch: refs/heads/master
Commit: 0c6aa525680d4f927c11b7dc85f9d79abf036b87
Parents: 9285f2b
Author: Michael Ho <kwho@cloudera.com>
Authored: Wed Jul 19 20:15:03 2017 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Thu Jul 27 04:03:12 2017 +0000

----------------------------------------------------------------------
 src/kudu/rpc/connection.cc    | 18 +++++++-----------
 src/kudu/rpc/connection.h     |  4 ----
 src/kudu/rpc/inbound_call.cc  | 20 ++++++++++++--------
 src/kudu/rpc/inbound_call.h   |  5 +++--
 src/kudu/rpc/outbound_call.cc | 17 ++++++++++++-----
 src/kudu/rpc/outbound_call.h  |  3 ++-
 src/kudu/rpc/transfer.cc      | 26 +++++++++++++-------------
 src/kudu/rpc/transfer.h       | 14 ++++++++++----
 8 files changed, 59 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index fc46d67..8dfdbdf 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -310,13 +310,8 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall>
&call) {
   call->set_call_id(call_id);
 
   // Serialize the actual bytes to be put on the wire.
-  slices_tmp_.clear();
-  Status s = call->SerializeTo(&slices_tmp_);
-  if (PREDICT_FALSE(!s.ok())) {
-    call->SetFailed(s, negotiation_complete_ ? Phase::REMOTE_CALL
-                                             : Phase::CONNECTION_NEGOTIATION);
-    return;
-  }
+  TransferPayload tmp_slices;
+  size_t n_slices = call->SerializeTo(&tmp_slices);
 
   call->SetQueued();
 
@@ -371,7 +366,7 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall>
&call) {
   TransferCallbacks *cb = new CallTransferCallbacks(call);
   awaiting_response_[call_id] = car.release();
   QueueOutbound(gscoped_ptr<OutboundTransfer>(
-      OutboundTransfer::CreateForCallRequest(call_id, slices_tmp_, cb)));
+      OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, n_slices, cb)));
 }
 
 // Callbacks for sending an RPC call response from the server.
@@ -442,14 +437,15 @@ void Connection::QueueResponseForCall(gscoped_ptr<InboundCall>
call) {
   // eventually runs in the reactor thread will take care of calling
   // ResponseTransferCallbacks::NotifyTransferAborted.
 
-  std::vector<Slice> slices;
-  call->SerializeResponseTo(&slices);
+  TransferPayload tmp_slices;
+  size_t n_slices = call->SerializeResponseTo(&tmp_slices);
 
   TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this);
   // After the response is sent, can delete the InboundCall object.
   // We set a dummy call ID and required feature set, since these are not needed
   // when sending responses.
-  gscoped_ptr<OutboundTransfer> t(OutboundTransfer::CreateForCallResponse(slices, cb));
+  gscoped_ptr<OutboundTransfer> t(
+      OutboundTransfer::CreateForCallResponse(tmp_slices, n_slices, cb));
 
   QueueTransferTask *task = new QueueTransferTask(std::move(t), this);
   reactor_thread_->reactor()->ScheduleReactorTask(task);

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index 816c43c..c6cae38 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -320,10 +320,6 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // Starts as Status::OK, gets set to a shutdown status upon Shutdown().
   Status shutdown_status_;
 
-  // Temporary vector used when serializing - avoids an allocation
-  // when serializing calls.
-  std::vector<Slice> slices_tmp_;
-
   // RPC features supported by the remote end of the connection.
   std::set<RpcFeatureFlag> remote_features_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/inbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index aba9977..d1c27b7 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -175,16 +175,20 @@ void InboundCall::SerializeResponseBuffer(const MessageLite& response,
                                  &response_hdr_buf_);
 }
 
-void InboundCall::SerializeResponseTo(vector<Slice>* slices) const {
+size_t InboundCall::SerializeResponseTo(TransferPayload* slices) const {
   TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo");
-  CHECK_GT(response_hdr_buf_.size(), 0);
-  CHECK_GT(response_msg_buf_.size(), 0);
-  slices->reserve(slices->size() + 2 + outbound_sidecars_.size());
-  slices->push_back(Slice(response_hdr_buf_));
-  slices->push_back(Slice(response_msg_buf_));
-  for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
-    slices->push_back(car->AsSlice());
+  DCHECK_GT(response_hdr_buf_.size(), 0);
+  DCHECK_GT(response_msg_buf_.size(), 0);
+  size_t n_slices = 2 + outbound_sidecars_.size();
+  DCHECK_LE(n_slices, slices->size());
+  auto slice_iter = slices->begin();
+  *slice_iter++ = Slice(response_hdr_buf_);
+  *slice_iter++ = Slice(response_msg_buf_);
+  for (auto& sidecar : outbound_sidecars_) {
+    *slice_iter++ = sidecar->AsSlice();
   }
+  DCHECK_EQ(slice_iter - slices->begin(), n_slices);
+  return n_slices;
 }
 
 Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 6bed18f..84e6745 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -119,9 +119,10 @@ class InboundCall {
                                    const google::protobuf::MessageLite& app_error_pb,
                                    ErrorStatusPB* err);
 
-  // Serialize the response packet for the finished call.
+  // Serialize the response packet for the finished call into 'slices'.
   // The resulting slices refer to memory in this object.
-  void SerializeResponseTo(std::vector<Slice>* slices) const;
+  // Returns the number of slices in the serialized response.
+  size_t SerializeResponseTo(TransferPayload* slices) const;
 
   // See RpcContext::AddRpcSidecar()
   Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index af03f1c..a238568 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -92,7 +92,7 @@ OutboundCall::~OutboundCall() {
   DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " <<
StateName(state_);
 }
 
-Status OutboundCall::SerializeTo(vector<Slice>* slices) {
+size_t OutboundCall::SerializeTo(TransferPayload* slices) {
   DCHECK_LT(0, request_buf_.size())
       << "Must call SetRequestPayload() before SerializeTo()";
 
@@ -109,10 +109,16 @@ Status OutboundCall::SerializeTo(vector<Slice>* slices) {
   serialization::SerializeHeader(
       header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_);
 
-  slices->push_back(Slice(header_buf_));
-  slices->push_back(Slice(request_buf_));
-  for (const unique_ptr<RpcSidecar>& car : sidecars_) slices->push_back(car->AsSlice());
-  return Status::OK();
+  size_t n_slices = 2 + sidecars_.size();
+  DCHECK_LE(n_slices, slices->size());
+  auto slice_iter = slices->begin();
+  *slice_iter++ = Slice(header_buf_);
+  *slice_iter++ = Slice(request_buf_);
+  for (auto& sidecar : sidecars_) {
+    *slice_iter++ = sidecar->AsSlice();
+  }
+  DCHECK_EQ(slice_iter - slices->begin(), n_slices);
+  return n_slices;
 }
 
 void OutboundCall::SetRequestPayload(const Message& req,
@@ -120,6 +126,7 @@ void OutboundCall::SetRequestPayload(const Message& req,
   DCHECK_EQ(-1, sidecar_byte_size_);
 
   sidecars_ = move(sidecars);
+  DCHECK_LE(sidecars_.size(), TransferLimits::kMaxSidecars);
 
   // Compute total size of sidecar payload so that extra space can be reserved as part of
   // the request body.

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index ebed9b5..16ebc8a 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -154,7 +154,8 @@ class OutboundCall {
 
   // Serialize the call for the wire. Requires that SetRequestPayload()
   // is called first. This is called from the Reactor thread.
-  Status SerializeTo(std::vector<Slice>* slices);
+  // Returns the number of slices in the serialized call.
+  size_t SerializeTo(TransferPayload* slices);
 
   // Callback after the call has been put on the outbound connection queue.
   void SetQueued();

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/transfer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index d24e94d..d660869 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -135,32 +135,32 @@ string InboundTransfer::StatusAsString() const {
   return Substitute("$0/$1 bytes received", cur_offset_, total_length_);
 }
 
-OutboundTransfer* OutboundTransfer::CreateForCallRequest(
-    int32_t call_id,
-    const std::vector<Slice> &payload,
-    TransferCallbacks *callbacks) {
-  return new OutboundTransfer(call_id, payload, callbacks);
+OutboundTransfer* OutboundTransfer::CreateForCallRequest(int32_t call_id,
+                                                         const TransferPayload &payload,
+                                                         size_t n_payload_slices,
+                                                         TransferCallbacks *callbacks) {
+  return new OutboundTransfer(call_id, payload, n_payload_slices, callbacks);
 }
 
-OutboundTransfer* OutboundTransfer::CreateForCallResponse(const std::vector<Slice>
&payload,
+OutboundTransfer* OutboundTransfer::CreateForCallResponse(const TransferPayload &payload,
+                                                          size_t n_payload_slices,
                                                           TransferCallbacks *callbacks) {
-  return new OutboundTransfer(kInvalidCallId, payload, callbacks);
+  return new OutboundTransfer(kInvalidCallId, payload, n_payload_slices, callbacks);
 }
 
-
 OutboundTransfer::OutboundTransfer(int32_t call_id,
-                                   const std::vector<Slice> &payload,
+                                   const TransferPayload &payload,
+                                   size_t n_payload_slices,
                                    TransferCallbacks *callbacks)
   : cur_slice_idx_(0),
     cur_offset_in_slice_(0),
     callbacks_(callbacks),
     call_id_(call_id),
     aborted_(false) {
-  CHECK(!payload.empty());
 
-  n_payload_slices_ = payload.size();
-  CHECK_LE(n_payload_slices_, arraysize(payload_slices_));
-  for (int i = 0; i < payload.size(); i++) {
+  n_payload_slices_ = n_payload_slices;
+  CHECK_LE(n_payload_slices_, payload_slices_.size());
+  for (int i = 0; i < n_payload_slices; i++) {
     payload_slices_[i] = payload[i];
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 671347a..2a2b726 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -18,6 +18,7 @@
 #ifndef KUDU_RPC_TRANSFER_H
 #define KUDU_RPC_TRANSFER_H
 
+#include <array>
 #include <boost/intrusive/list.hpp>
 #include <gflags/gflags.h>
 #include <set>
@@ -56,6 +57,8 @@ class TransferLimits {
   DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
 };
 
+typedef std::array<Slice, TransferLimits::kMaxPayloadSlices> TransferPayload;
+
 // This class is used internally by the RPC layer to represent an inbound
 // transfer in progress.
 //
@@ -119,12 +122,14 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<>
{
 
   // Create an outbound transfer for a call request.
   static OutboundTransfer* CreateForCallRequest(int32_t call_id,
-                                                const std::vector<Slice> &payload,
+                                                const TransferPayload &payload,
+                                                size_t n_payload_slices,
                                                 TransferCallbacks *callbacks);
 
   // Create an outbound transfer for a call response.
   // See above for details.
-  static OutboundTransfer* CreateForCallResponse(const std::vector<Slice> &payload,
+  static OutboundTransfer* CreateForCallResponse(const TransferPayload &payload,
+                                                 size_t n_payload_slices,
                                                  TransferCallbacks *callbacks);
 
   // Destruct the transfer. A transfer object should never be deallocated
@@ -162,12 +167,13 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<>
{
 
  private:
   OutboundTransfer(int32_t call_id,
-                   const std::vector<Slice> &payload,
+                   const TransferPayload& payload,
+                   size_t n_payload_slices,
                    TransferCallbacks *callbacks);
 
   // Slices to send. Uses an array here instead of a vector to avoid an expensive
   // vector construction (improved performance a couple percent).
-  Slice payload_slices_[TransferLimits::kMaxPayloadSlices];
+  TransferPayload payload_slices_;
   size_t n_payload_slices_;
 
   // The current slice that is being sent.


Mime
View raw message