impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [13/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC library from kudu@314c9d8
Date Thu, 17 Aug 2017 03:14:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/inbound_call.h b/be/src/kudu/rpc/inbound_call.h
new file mode 100644
index 0000000..6bed18f
--- /dev/null
+++ b/be/src/kudu/rpc/inbound_call.h
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_INBOUND_CALL_H
+#define KUDU_RPC_INBOUND_CALL_H
+
+#include <glog/logging.h>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Histogram;
+class Trace;
+
+namespace rpc {
+
+class Connection;
+class DumpRunningRpcsRequestPB;
+class RemoteUser;
+class RpcCallInProgressPB;
+struct RpcMethodInfo;
+class RpcSidecar;
+
+struct InboundCallTiming {
+  MonoTime time_received;   // Time the call was first accepted.
+  MonoTime time_handled;    // Time the call handler was kicked off.
+  MonoTime time_completed;  // Time the call handler completed.
+
+  MonoDelta TotalDuration() const {
+    return time_completed - time_received;
+  }
+};
+
+// Inbound call on server
+class InboundCall {
+ public:
+  explicit InboundCall(Connection* conn);
+  ~InboundCall();
+
+  // Parse an inbound call message.
+  //
+  // This only deserializes the call header, populating the 'header_' and
+  // 'serialized_request_' member variables. The actual call parameter is
+  // not deserialized, as this may be CPU-expensive, and this is called
+  // from the reactor thread.
+  Status ParseFrom(gscoped_ptr<InboundTransfer> transfer);
+
+  // Return the serialized request parameter protobuf.
+  const Slice& serialized_request() const {
+    DCHECK(transfer_) << "Transfer discarded before parameter parsing";
+    return serialized_request_;
+  }
+
+  const RemoteMethod& remote_method() const {
+    return remote_method_;
+  }
+
+  const int32_t call_id() const {
+    return header_.call_id();
+  }
+
+  // Serializes 'response' into the InboundCall's internal buffer, and marks
+  // the call as a success. Enqueues the response back to the connection
+  // that made the call.
+  //
+  // This method deletes the InboundCall object, so no further calls may be
+  // made after this one.
+  void RespondSuccess(const google::protobuf::MessageLite& response);
+
+  // Serializes a failure response into the internal buffer, marking the
+  // call as a failure. Enqueues the response back to the connection that
+  // made the call.
+  //
+  // This method deletes the InboundCall object, so no further calls may be
+  // made after this one.
+  void RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code,
+                      const Status &status);
+
+  void RespondUnsupportedFeature(const std::vector<uint32_t>& unsupported_features);
+
+  void RespondApplicationError(int error_ext_id, const std::string& message,
+                               const google::protobuf::MessageLite& app_error_pb);
+
+  // Convert an application error extension to an ErrorStatusPB.
+  // These ErrorStatusPB objects are what are returned in application error responses.
+  static void ApplicationErrorToPB(int error_ext_id, const std::string& message,
+                                   const google::protobuf::MessageLite& app_error_pb,
+                                   ErrorStatusPB* err);
+
+  // Serialize the response packet for the finished call.
+  // The resulting slices refer to memory in this object.
+  void SerializeResponseTo(std::vector<Slice>* slices) const;
+
+  // See RpcContext::AddRpcSidecar()
+  Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
+
+  std::string ToString() const;
+
+  void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp);
+
+  const RemoteUser& remote_user() const;
+
+  const Sockaddr& remote_address() const;
+
+  const scoped_refptr<Connection>& connection() const;
+
+  Trace* trace();
+
+  const InboundCallTiming& timing() const {
+    return timing_;
+  }
+
+  const RequestHeader& header() const {
+    return header_;
+  }
+
+  // Associate this call with a particular method that will be invoked
+  // by the service.
+  void set_method_info(scoped_refptr<RpcMethodInfo> info) {
+    method_info_ = std::move(info);
+  }
+
+  // Return the method associated with this call. This is set just before
+  // the call is enqueued onto the service queue, and therefore may be
+  // 'nullptr' for much of the lifecycle of a call.
+  RpcMethodInfo* method_info() {
+    return method_info_.get();
+  }
+
+  // When this InboundCall was received (instantiated).
+  // Should only be called once on a given instance.
+  // Not thread-safe. Should only be called by the current "owner" thread.
+  void RecordCallReceived();
+
+  // When RPC call Handle() was called on the server side.
+  // Updates the Histogram with time elapsed since the call was received,
+  // and should only be called once on a given instance.
+  // Not thread-safe. Should only be called by the current "owner" thread.
+  void RecordHandlingStarted(scoped_refptr<Histogram> incoming_queue_time);
+
+  // Return true if the deadline set by the client has already elapsed.
+  // In this case, the server may stop processing the call, since the
+  // call response will be ignored anyway.
+  bool ClientTimedOut() const;
+
+  // Return an upper bound on the client timeout deadline. This does not
+  // account for transmission delays between the client and the server.
+  // If the client did not specify a deadline, returns MonoTime::Max().
+  MonoTime GetClientDeadline() const;
+
+  // Return the time when this call was received.
+  MonoTime GetTimeReceived() const;
+
+  // Returns the set of application-specific feature flags required to service
+  // the RPC.
+  std::vector<uint32_t> GetRequiredFeatures() const;
+
+  // Get a sidecar sent as part of the request. If idx < 0 || idx > num sidecars - 1,
+  // returns an error.
+  Status GetInboundSidecar(int idx, Slice* sidecar) const;
+
+  // Releases the buffer that contains the request + sidecar data. It is an error to
+  // access sidecars or serialized_request() after this method is called.
+  void DiscardTransfer();
+
+  // Returns the size of the transfer buffer that backs this call. If the transfer does
+  // not exist (e.g. GetTransferSize() is called after DiscardTransfer()), returns 0.
+  size_t GetTransferSize();
+
+ private:
+  friend class RpczStore;
+
+  // Serialize and queue the response.
+  void Respond(const google::protobuf::MessageLite& response,
+               bool is_success);
+
+  // Serialize a response message for either success or failure. If it is a success,
+  // 'response' should be the user-defined response type for the call. If it is a
+  // failure, 'response' should be an ErrorStatusPB instance.
+  void SerializeResponseBuffer(const google::protobuf::MessageLite& response,
+                               bool is_success);
+
+  // When RPC call Handle() completed execution on the server side.
+  // Updates the Histogram with time elapsed since the call was started,
+  // and should only be called once on a given instance.
+  // Not thread-safe. Should only be called by the current "owner" thread.
+  void RecordHandlingCompleted();
+
+  // The connection on which this inbound call arrived.
+  scoped_refptr<Connection> conn_;
+
+  // The header of the incoming call. Set by ParseFrom()
+  RequestHeader header_;
+
+  // The serialized bytes of the request param protobuf. Set by ParseFrom().
+  // This references memory held by 'transfer_'.
+  Slice serialized_request_;
+
+  // The transfer that produced the call.
+  // This is kept around because it retains the memory referred to
+  // by 'serialized_request_' above.
+  gscoped_ptr<InboundTransfer> transfer_;
+
+  // The buffers for serialized response. Set by SerializeResponseBuffer().
+  faststring response_hdr_buf_;
+  faststring response_msg_buf_;
+
+  // Vector of additional sidecars that are tacked on to the call's response
+  // after serialization of the protobuf. See rpc/rpc_sidecar.h for more info.
+  std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
+
+  // Inbound sidecars from the request. The slices are views onto transfer_. There are as
+  // many slices as header_.sidecar_offsets_size().
+  Slice inbound_sidecar_slices_[TransferLimits::kMaxSidecars];
+
+  // The trace buffer.
+  scoped_refptr<Trace> trace_;
+
+  // Timing information related to this RPC call.
+  InboundCallTiming timing_;
+
+  // Proto service this calls belongs to. Used for routing.
+  // This field is filled in when the inbound request header is parsed.
+  RemoteMethod remote_method_;
+
+  // After the method has been looked up within the service, this is filled in
+  // to point to the information about this method. Acts as a pointer back to
+  // per-method info such as tracing.
+  scoped_refptr<RpcMethodInfo> method_info_;
+
+  DISALLOW_COPY_AND_ASSIGN(InboundCall);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.cc b/be/src/kudu/rpc/messenger.cc
new file mode 100644
index 0000000..28fea55
--- /dev/null
+++ b/be/src/kudu/rpc/messenger.cc
@@ -0,0 +1,488 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/messenger.h"
+
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <list>
+#include <mutex>
+#include <set>
+#include <string>
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/acceptor_pool.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/reactor.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/rpcz_store.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/flag_validators.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+using std::string;
+using std::shared_ptr;
+using std::make_shared;
+using strings::Substitute;
+
+DEFINE_string(rpc_authentication, "optional",
+              "Whether to require RPC connections to authenticate. Must be one "
+              "of 'disabled', 'optional', or 'required'. If 'optional', "
+              "authentication will be used when the remote end supports it. If "
+              "'required', connections which are not able to authenticate "
+              "(because the remote end lacks support) are rejected. Secure "
+              "clusters should use 'required'.");
+DEFINE_string(rpc_encryption, "optional",
+              "Whether to require RPC connections to be encrypted. Must be one "
+              "of 'disabled', 'optional', or 'required'. If 'optional', "
+              "encryption will be used when the remote end supports it. If "
+              "'required', connections which are not able to use encryption "
+              "(because the remote end lacks support) are rejected. If 'disabled', "
+              "encryption will not be used, and RPC authentication "
+              "(--rpc_authentication) must also be disabled as well. "
+              "Secure clusters should use 'required'.");
+TAG_FLAG(rpc_authentication, evolving);
+TAG_FLAG(rpc_encryption, evolving);
+
+DEFINE_string(rpc_certificate_file, "",
+              "Path to a PEM encoded X509 certificate to use for securing RPC "
+              "connections with SSL/TLS. If set, '--rpc_private_key_file' and "
+              "'--rpc_ca_certificate_file' must be set as well.");
+DEFINE_string(rpc_private_key_file, "",
+              "Path to a PEM encoded private key paired with the certificate "
+              "from '--rpc_certificate_file'");
+DEFINE_string(rpc_ca_certificate_file, "",
+              "Path to the PEM encoded X509 certificate of the trusted external "
+              "certificate authority. The provided certificate should be the root "
+              "issuer of the certificate passed in '--rpc_certificate_file'.");
+
+// Setting TLS certs and keys via CLI flags is only necessary for external
+// PKI-based security, which is not yet production ready. Instead, see
+// internal PKI (ipki) and Kerberos-based authentication.
+TAG_FLAG(rpc_certificate_file, experimental);
+TAG_FLAG(rpc_private_key_file, experimental);
+TAG_FLAG(rpc_ca_certificate_file, experimental);
+
+DEFINE_int32(rpc_default_keepalive_time_ms, 65000,
+             "If an RPC connection from a client is idle for this amount of time, the server "
+             "will disconnect the client.");
+TAG_FLAG(rpc_default_keepalive_time_ms, advanced);
+
+DECLARE_string(keytab_file);
+
+namespace kudu {
+namespace rpc {
+
+class Messenger;
+class ServerBuilder;
+
+template <typename T>
+static Status ParseTriState(const char* flag_name, const string& flag_value, T* tri_state) {
+  if (boost::iequals(flag_value, "required")) {
+    *tri_state = T::REQUIRED;
+  } else if (boost::iequals(flag_value, "optional")) {
+    *tri_state = T::OPTIONAL;
+  } else if (boost::iequals(flag_value, "disabled")) {
+    *tri_state = T::DISABLED;
+  } else {
+    return Status::InvalidArgument(Substitute(
+          "$0 flag must be one of 'required', 'optional', or 'disabled'",
+          flag_name));
+  }
+  return Status::OK();
+}
+
+static bool ValidateRpcAuthentication(const char* flag_name, const string& flag_value) {
+  RpcAuthentication result;
+  Status s = ParseTriState(flag_name, flag_value, &result);
+  if (!s.ok()) {
+    LOG(ERROR) << s.message().ToString();
+    return false;
+  }
+  return true;
+}
+DEFINE_validator(rpc_authentication, &ValidateRpcAuthentication);
+
+static bool ValidateRpcEncryption(const char* flag_name, const string& flag_value) {
+  RpcEncryption result;
+  Status s = ParseTriState(flag_name, flag_value, &result);
+  if (!s.ok()) {
+    LOG(ERROR) << s.message().ToString();
+    return false;
+  }
+  return true;
+}
+DEFINE_validator(rpc_encryption, &ValidateRpcEncryption);
+
+static bool ValidateRpcAuthnFlags() {
+  RpcAuthentication authentication;
+  CHECK_OK(ParseTriState("--rpc_authentication", FLAGS_rpc_authentication, &authentication));
+
+  RpcEncryption encryption;
+  CHECK_OK(ParseTriState("--rpc_encryption", FLAGS_rpc_encryption, &encryption));
+
+  if (encryption == RpcEncryption::DISABLED && authentication != RpcAuthentication::DISABLED) {
+    LOG(ERROR) << "RPC authentication (--rpc_authentication) must be disabled "
+                  "if RPC encryption (--rpc_encryption) is disabled";
+    return false;
+  }
+
+  const bool has_keytab = !FLAGS_keytab_file.empty();
+  const bool has_cert = !FLAGS_rpc_certificate_file.empty();
+  if (authentication == RpcAuthentication::REQUIRED && !has_keytab && !has_cert) {
+    LOG(ERROR) << "RPC authentication (--rpc_authentication) may not be "
+                  "required unless Kerberos (--keytab_file) or external PKI "
+                  "(--rpc_certificate_file et al) are configured";
+    return false;
+  }
+
+  return true;
+}
+GROUP_FLAG_VALIDATOR(rpc_authn_flags, ValidateRpcAuthnFlags);
+
+static bool ValidateExternalPkiFlags() {
+  bool has_cert = !FLAGS_rpc_certificate_file.empty();
+  bool has_key = !FLAGS_rpc_private_key_file.empty();
+  bool has_ca = !FLAGS_rpc_ca_certificate_file.empty();
+
+  if (has_cert != has_key || has_cert != has_ca) {
+    LOG(ERROR) << "--rpc_certificate_file, --rpc_private_key_file, and "
+                  "--rpc_ca_certificate_file flags must be set as a group; "
+                  "i.e. either set all or none of them.";
+    return false;
+  }
+
+  return true;
+}
+GROUP_FLAG_VALIDATOR(external_pki_flags, ValidateExternalPkiFlags);
+
+MessengerBuilder::MessengerBuilder(std::string name)
+    : name_(std::move(name)),
+      connection_keepalive_time_(
+          MonoDelta::FromMilliseconds(FLAGS_rpc_default_keepalive_time_ms)),
+      num_reactors_(4),
+      min_negotiation_threads_(0),
+      max_negotiation_threads_(4),
+      coarse_timer_granularity_(MonoDelta::FromMilliseconds(100)),
+      enable_inbound_tls_(false) {
+}
+
+MessengerBuilder& MessengerBuilder::set_connection_keepalive_time(const MonoDelta &keepalive) {
+  connection_keepalive_time_ = keepalive;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_num_reactors(int num_reactors) {
+  num_reactors_ = num_reactors;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_min_negotiation_threads(int min_negotiation_threads) {
+  min_negotiation_threads_ = min_negotiation_threads;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_max_negotiation_threads(int max_negotiation_threads) {
+  max_negotiation_threads_ = max_negotiation_threads;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_coarse_timer_granularity(const MonoDelta &granularity) {
+  coarse_timer_granularity_ = granularity;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_metric_entity(
+    const scoped_refptr<MetricEntity>& metric_entity) {
+  metric_entity_ = metric_entity;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::enable_inbound_tls() {
+  enable_inbound_tls_ = true;
+  return *this;
+}
+
+Status MessengerBuilder::Build(shared_ptr<Messenger> *msgr) {
+  RETURN_NOT_OK(SaslInit()); // Initialize SASL library before we start making requests
+
+  Messenger* new_msgr(new Messenger(*this));
+
+  auto cleanup = MakeScopedCleanup([&] () {
+      new_msgr->AllExternalReferencesDropped();
+  });
+
+  RETURN_NOT_OK(ParseTriState("--rpc_authentication",
+                              FLAGS_rpc_authentication,
+                              &new_msgr->authentication_));
+
+  RETURN_NOT_OK(ParseTriState("--rpc_encryption",
+                              FLAGS_rpc_encryption,
+                              &new_msgr->encryption_));
+
+  RETURN_NOT_OK(new_msgr->Init());
+  if (new_msgr->encryption_ != RpcEncryption::DISABLED && enable_inbound_tls_) {
+    auto* tls_context = new_msgr->mutable_tls_context();
+
+    if (!FLAGS_rpc_certificate_file.empty()) {
+      CHECK(!FLAGS_rpc_private_key_file.empty());
+      CHECK(!FLAGS_rpc_ca_certificate_file.empty());
+      // TODO(KUDU-1920): should we try and enforce that the server
+      // is in the subject or alt names of the cert?
+      RETURN_NOT_OK(tls_context->LoadCertificateAuthority(FLAGS_rpc_ca_certificate_file));
+      RETURN_NOT_OK(tls_context->LoadCertificateAndKey(FLAGS_rpc_certificate_file,
+                                                       FLAGS_rpc_private_key_file));
+    } else {
+      RETURN_NOT_OK(tls_context->GenerateSelfSignedCertAndKey());
+    }
+  }
+
+  // See docs on Messenger::retain_self_ for info about this odd hack.
+  cleanup.cancel();
+  *msgr = shared_ptr<Messenger>(new_msgr, std::mem_fun(&Messenger::AllExternalReferencesDropped));
+  return Status::OK();
+}
+
+// See comment on Messenger::retain_self_ member.
+void Messenger::AllExternalReferencesDropped() {
+  Shutdown();
+  CHECK(retain_self_.get());
+  // If we have no more external references, then we no longer
+  // need to retain ourself. We'll destruct as soon as all our
+  // internal-facing references are dropped (ie those from reactor
+  // threads).
+  retain_self_.reset();
+}
+
+void Messenger::Shutdown() {
+  // Since we're shutting down, it's OK to block.
+  ThreadRestrictions::ScopedAllowWait allow_wait;
+
+  std::lock_guard<percpu_rwlock> guard(lock_);
+  if (closing_) {
+    return;
+  }
+  VLOG(1) << "shutting down messenger " << name_;
+  closing_ = true;
+
+  DCHECK(rpc_services_.empty()) << "Unregister RPC services before shutting down Messenger";
+  rpc_services_.clear();
+
+  for (const shared_ptr<AcceptorPool>& acceptor_pool : acceptor_pools_) {
+    acceptor_pool->Shutdown();
+  }
+  acceptor_pools_.clear();
+
+  // Need to shut down negotiation pool before the reactors, since the
+  // reactors close the Connection sockets, and may race against the negotiation
+  // threads' blocking reads & writes.
+  negotiation_pool_->Shutdown();
+
+  for (Reactor* reactor : reactors_) {
+    reactor->Shutdown();
+  }
+  tls_context_.reset();
+}
+
+Status Messenger::AddAcceptorPool(const Sockaddr &accept_addr,
+                                  shared_ptr<AcceptorPool>* pool) {
+  // Before listening, if we expect to require Kerberos, we want to verify
+  // that everything is set up correctly. This way we'll generate errors on
+  // startup rather than later on when we first receive a client connection.
+  if (!FLAGS_keytab_file.empty()) {
+    RETURN_NOT_OK_PREPEND(ServerNegotiation::PreflightCheckGSSAPI(),
+                          "GSSAPI/Kerberos not properly configured");
+  }
+
+  Socket sock;
+  RETURN_NOT_OK(sock.Init(0));
+  RETURN_NOT_OK(sock.SetReuseAddr(true));
+  RETURN_NOT_OK(sock.Bind(accept_addr));
+  Sockaddr remote;
+  RETURN_NOT_OK(sock.GetSocketAddress(&remote));
+  auto acceptor_pool(make_shared<AcceptorPool>(this, &sock, remote));
+
+  std::lock_guard<percpu_rwlock> guard(lock_);
+  acceptor_pools_.push_back(acceptor_pool);
+  pool->swap(acceptor_pool);
+  return Status::OK();
+}
+
+// Register a new RpcService to handle inbound requests.
+Status Messenger::RegisterService(const string& service_name,
+                                  const scoped_refptr<RpcService>& service) {
+  DCHECK(service);
+  std::lock_guard<percpu_rwlock> guard(lock_);
+  if (InsertIfNotPresent(&rpc_services_, service_name, service)) {
+    return Status::OK();
+  } else {
+    return Status::AlreadyPresent("This service is already present");
+  }
+}
+
+Status Messenger::UnregisterAllServices() {
+  std::lock_guard<percpu_rwlock> guard(lock_);
+  rpc_services_.clear();
+  return Status::OK();
+}
+
+// Unregister an RpcService.
+Status Messenger::UnregisterService(const string& service_name) {
+  std::lock_guard<percpu_rwlock> guard(lock_);
+  if (rpc_services_.erase(service_name)) {
+    return Status::OK();
+  } else {
+    return Status::ServiceUnavailable(Substitute("service $0 not registered on $1",
+                 service_name, name_));
+  }
+}
+
+void Messenger::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
+  Reactor *reactor = RemoteToReactor(call->conn_id().remote());
+  reactor->QueueOutboundCall(call);
+}
+
+void Messenger::QueueInboundCall(gscoped_ptr<InboundCall> call) {
+  shared_lock<rw_spinlock> guard(lock_.get_lock());
+  scoped_refptr<RpcService>* service = FindOrNull(rpc_services_,
+                                                  call->remote_method().service_name());
+  if (PREDICT_FALSE(!service)) {
+    Status s =  Status::ServiceUnavailable(Substitute("service $0 not registered on $1",
+                                                      call->remote_method().service_name(), name_));
+    LOG(INFO) << s.ToString();
+    call.release()->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_SERVICE, s);
+    return;
+  }
+
+  call->set_method_info((*service)->LookupMethod(call->remote_method()));
+
+  // The RpcService will respond to the client on success or failure.
+  WARN_NOT_OK((*service)->QueueInboundCall(std::move(call)), "Unable to handle RPC call");
+}
+
+void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote) {
+  Reactor *reactor = RemoteToReactor(remote);
+  reactor->RegisterInboundSocket(new_socket, remote);
+}
+
+Messenger::Messenger(const MessengerBuilder &bld)
+  : name_(bld.name_),
+    closing_(false),
+    authentication_(RpcAuthentication::REQUIRED),
+    encryption_(RpcEncryption::REQUIRED),
+    tls_context_(new security::TlsContext()),
+    token_verifier_(new security::TokenVerifier()),
+    rpcz_store_(new RpczStore()),
+    metric_entity_(bld.metric_entity_),
+    retain_self_(this) {
+  for (int i = 0; i < bld.num_reactors_; i++) {
+    reactors_.push_back(new Reactor(retain_self_, i, bld));
+  }
+  CHECK_OK(ThreadPoolBuilder("negotiator")
+              .set_min_threads(bld.min_negotiation_threads_)
+              .set_max_threads(bld.max_negotiation_threads_)
+              .Build(&negotiation_pool_));
+}
+
+Messenger::~Messenger() {
+  std::lock_guard<percpu_rwlock> guard(lock_);
+  CHECK(closing_) << "Should have already shut down";
+  STLDeleteElements(&reactors_);
+}
+
+Reactor* Messenger::RemoteToReactor(const Sockaddr &remote) {
+  uint32_t hashCode = remote.HashCode();
+  int reactor_idx = hashCode % reactors_.size();
+  // This is just a static partitioning; we could get a lot
+  // fancier with assigning Sockaddrs to Reactors.
+  return reactors_[reactor_idx];
+}
+
+Status Messenger::Init() {
+  RETURN_NOT_OK(tls_context_->Init());
+  for (Reactor* r : reactors_) {
+    RETURN_NOT_OK(r->Init());
+  }
+
+  return Status::OK();
+}
+
+Status Messenger::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+                                  DumpRunningRpcsResponsePB* resp) {
+  shared_lock<rw_spinlock> guard(lock_.get_lock());
+  for (Reactor* reactor : reactors_) {
+    RETURN_NOT_OK(reactor->DumpRunningRpcs(req, resp));
+  }
+  return Status::OK();
+}
+
+void Messenger::ScheduleOnReactor(const boost::function<void(const Status&)>& func,
+                                  MonoDelta when) {
+  DCHECK(!reactors_.empty());
+
+  // If we're already running on a reactor thread, reuse it.
+  Reactor* chosen = nullptr;
+  for (Reactor* r : reactors_) {
+    if (r->IsCurrentThread()) {
+      chosen = r;
+    }
+  }
+  if (chosen == nullptr) {
+    // Not running on a reactor thread, pick one at random.
+    chosen = reactors_[rand() % reactors_.size()];
+  }
+
+  DelayedTask* task = new DelayedTask(func, when);
+  chosen->ScheduleReactorTask(task);
+}
+
+const scoped_refptr<RpcService> Messenger::rpc_service(const string& service_name) const {
+  std::lock_guard<percpu_rwlock> guard(lock_);
+  scoped_refptr<RpcService> service;
+  if (FindCopy(rpc_services_, service_name, &service)) {
+    return service;
+  } else {
+    return scoped_refptr<RpcService>(nullptr);
+  }
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h
new file mode 100644
index 0000000..1ba76a7
--- /dev/null
+++ b/be/src/kudu/rpc/messenger.h
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_MESSENGER_H
+#define KUDU_RPC_MESSENGER_H
+
+#include <stdint.h>
+
+#include <list>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/optional.hpp>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Socket;
+class ThreadPool;
+
+namespace security {
+class TlsContext;
+class TokenVerifier;
+}
+
+namespace rpc {
+
+class AcceptorPool;
+class DumpRunningRpcsRequestPB;
+class DumpRunningRpcsResponsePB;
+class InboundCall;
+class Messenger;
+class OutboundCall;
+class Reactor;
+class ReactorThread;
+class RpcService;
+class RpczStore;
+
+struct AcceptorPoolInfo {
+ public:
+  explicit AcceptorPoolInfo(Sockaddr bind_address)
+      : bind_address_(std::move(bind_address)) {}
+
+  Sockaddr bind_address() const {
+    return bind_address_;
+  }
+
+ private:
+  Sockaddr bind_address_;
+};
+
+// Authentication configuration for RPC connections.
+enum class RpcAuthentication {
+  DISABLED,
+  OPTIONAL,
+  REQUIRED,
+};
+
+// Encryption configuration for RPC connections.
+enum class RpcEncryption {
+  DISABLED,
+  OPTIONAL,
+  REQUIRED,
+};
+
+// Used to construct a Messenger.
+class MessengerBuilder {
+ public:
+  friend class Messenger;
+  friend class ReactorThread;
+
+  explicit MessengerBuilder(std::string name);
+
+  // Set the length of time we will keep a TCP connection will alive with no traffic.
+  MessengerBuilder &set_connection_keepalive_time(const MonoDelta &keepalive);
+
+  // Set the number of reactor threads that will be used for sending and
+  // receiving.
+  MessengerBuilder &set_num_reactors(int num_reactors);
+
+  // Set the minimum number of connection-negotiation threads that will be used
+  // to handle the blocking connection-negotiation step.
+  MessengerBuilder &set_min_negotiation_threads(int min_negotiation_threads);
+
+  // Set the maximum number of connection-negotiation threads that will be used
+  // to handle the blocking connection-negotiation step.
+  MessengerBuilder &set_max_negotiation_threads(int max_negotiation_threads);
+
+  // Set the granularity with which connections are checked for keepalive.
+  MessengerBuilder &set_coarse_timer_granularity(const MonoDelta &granularity);
+
+  // Set metric entity for use by RPC systems.
+  MessengerBuilder &set_metric_entity(const scoped_refptr<MetricEntity>& metric_entity);
+
+  // Configure the messenger to enable TLS encryption on inbound connections.
+  MessengerBuilder& enable_inbound_tls();
+
+  Status Build(std::shared_ptr<Messenger> *msgr);
+
+ private:
+  const std::string name_;
+  MonoDelta connection_keepalive_time_;
+  int num_reactors_;
+  int min_negotiation_threads_;
+  int max_negotiation_threads_;
+  MonoDelta coarse_timer_granularity_;
+  scoped_refptr<MetricEntity> metric_entity_;
+  bool enable_inbound_tls_;
+};
+
+// A Messenger is a container for the reactor threads which run event loops
+// for the RPC services. If the process is a server, a Messenger can also have
+// one or more attached AcceptorPools which accept RPC connections. In this case,
+// calls received over the connection are enqueued into the messenger's service_queue
+// for processing by a ServicePool.
+//
+// Users do not typically interact with the Messenger directly except to create
+// one as a singleton, and then make calls using Proxy objects.
+//
+// See rpc-test.cc and rpc-bench.cc for example usages.
+class Messenger {
+ public:
+  friend class MessengerBuilder;
+  friend class Proxy;
+  friend class Reactor;
+  typedef std::vector<std::shared_ptr<AcceptorPool> > acceptor_vec_t;
+  typedef std::unordered_map<std::string, scoped_refptr<RpcService> > RpcServicesMap;
+
+  static const uint64_t UNKNOWN_CALL_ID = 0;
+
+  ~Messenger();
+
+  // Stop all communication and prevent further use.
+  // It's not required to call this -- dropping the shared_ptr provided
+  // from MessengerBuilder::Build will automatically call this method.
+  void Shutdown();
+
+  // Add a new acceptor pool listening to the given accept address.
+  // You can create any number of acceptor pools you want, including none.
+  //
+  // The created pool is returned in *pool. The Messenger also retains
+  // a reference to the pool, so the caller may safely drop this reference
+  // and the pool will remain live.
+  //
+  // NOTE: the returned pool is not initially started. You must call
+  // pool->Start(...) to begin accepting connections.
+  //
+  // If Kerberos is enabled, this also runs a pre-flight check that makes
+  // sure the environment is appropriately configured to authenticate
+  // clients via Kerberos. If not, this returns a RuntimeError.
+  Status AddAcceptorPool(const Sockaddr &accept_addr,
+                         std::shared_ptr<AcceptorPool>* pool);
+
+  // Register a new RpcService to handle inbound requests.
+  Status RegisterService(const std::string& service_name,
+                         const scoped_refptr<RpcService>& service);
+
+  // Unregister currently-registered RpcService.
+  Status UnregisterService(const std::string& service_name);
+
+  Status UnregisterAllServices();
+
+  // Queue a call for transmission. This will pick the appropriate reactor,
+  // and enqueue a task on that reactor to assign and send the call.
+  void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+  // Enqueue a call for processing on the server.
+  void QueueInboundCall(gscoped_ptr<InboundCall> call);
+
+  // Take ownership of the socket via Socket::Release
+  void RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote);
+
+  // Dump the current RPCs into the given protobuf.
+  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+                         DumpRunningRpcsResponsePB* resp);
+
+  // Run 'func' on a reactor thread after 'when' time elapses.
+  //
+  // The status argument conveys whether 'func' was run correctly (i.e.
+  // after the elapsed time) or not.
+  void ScheduleOnReactor(const boost::function<void(const Status&)>& func,
+                         MonoDelta when);
+
+  const security::TlsContext& tls_context() const { return *tls_context_; }
+  security::TlsContext* mutable_tls_context() { return tls_context_.get(); }
+
+  const security::TokenVerifier& token_verifier() const { return *token_verifier_; }
+  security::TokenVerifier* mutable_token_verifier() { return token_verifier_.get(); }
+  std::shared_ptr<security::TokenVerifier> shared_token_verifier() const {
+    return token_verifier_;
+  }
+
+  boost::optional<security::SignedTokenPB> authn_token() const {
+    std::lock_guard<simple_spinlock> l(authn_token_lock_);
+    return authn_token_;
+  }
+  void set_authn_token(const security::SignedTokenPB& token) {
+    std::lock_guard<simple_spinlock> l(authn_token_lock_);
+    authn_token_ = token;
+  }
+
+  RpcAuthentication authentication() const { return authentication_; }
+  RpcEncryption encryption() const { return encryption_; }
+
+  ThreadPool* negotiation_pool() const { return negotiation_pool_.get(); }
+
+  RpczStore* rpcz_store() { return rpcz_store_.get(); }
+
+  int num_reactors() const { return reactors_.size(); }
+
+  const std::string& name() const {
+    return name_;
+  }
+
+  bool closing() const {
+    shared_lock<rw_spinlock> l(lock_.get_lock());
+    return closing_;
+  }
+
+  scoped_refptr<MetricEntity> metric_entity() const { return metric_entity_.get(); }
+
+  const scoped_refptr<RpcService> rpc_service(const std::string& service_name) const;
+
+ private:
+  FRIEND_TEST(TestRpc, TestConnectionKeepalive);
+  FRIEND_TEST(TestRpc, TestCredentialsPolicy);
+  FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
+
+  explicit Messenger(const MessengerBuilder &bld);
+
+  Reactor* RemoteToReactor(const Sockaddr &remote);
+  Status Init();
+  void RunTimeoutThread();
+  void UpdateCurTime();
+
+  // Called by external-facing shared_ptr when the user no longer holds
+  // any references. See 'retain_self_' for more info.
+  void AllExternalReferencesDropped();
+
+  const std::string name_;
+
+  // Protects closing_, acceptor_pools_, rpc_services_.
+  mutable percpu_rwlock lock_;
+
+  bool closing_;
+
+  // Whether to require authentication and encryption on the connections managed
+  // by this messenger.
+  // TODO(KUDU-1928): scope these to individual proxies, so that messengers can be
+  // reused by different clients.
+  RpcAuthentication authentication_;
+  RpcEncryption encryption_;
+
+  // Pools which are listening on behalf of this messenger.
+  // Note that the user may have called Shutdown() on one of these
+  // pools, so even though we retain the reference, it may no longer
+  // be listening.
+  acceptor_vec_t acceptor_pools_;
+
+  // RPC services that handle inbound requests.
+  RpcServicesMap rpc_services_;
+
+  std::vector<Reactor*> reactors_;
+
+  gscoped_ptr<ThreadPool> negotiation_pool_;
+
+  std::unique_ptr<security::TlsContext> tls_context_;
+
+  // A TokenVerifier, which can verify client provided authentication tokens.
+  std::shared_ptr<security::TokenVerifier> token_verifier_;
+
+  // An optional token, which can be used to authenticate to a server.
+  mutable simple_spinlock authn_token_lock_;
+  boost::optional<security::SignedTokenPB> authn_token_;
+
+  std::unique_ptr<RpczStore> rpcz_store_;
+
+  scoped_refptr<MetricEntity> metric_entity_;
+
+  // The ownership of the Messenger object is somewhat subtle. The pointer graph
+  // looks like this:
+  //
+  //    [User Code ]             |      [ Internal code ]
+  //                             |
+  //     shared_ptr[1]           |
+  //         |                   |
+  //         v
+  //      Messenger    <------------ shared_ptr[2] --- Reactor
+  //       ^    |       ----------- bare pointer --> Reactor
+  //        \__/
+  //     shared_ptr[2]
+  //     (retain_self_)
+  //
+  // shared_ptr[1] instances use Messenger::AllExternalReferencesDropped()
+  //   as a deleter.
+  // shared_ptr[2] are "traditional" shared_ptrs which call 'delete' on the
+  //   object.
+  //
+  // The teardown sequence is as follows:
+  // Option 1): User calls "Shutdown()" explicitly:
+  //  - Messenger::Shutdown tells Reactors to shut down
+  //  - When each reactor thread finishes, it drops its shared_ptr[2]
+  //  - the Messenger::retain_self instance remains, keeping the Messenger
+  //    alive.
+  //  - The user eventually drops its shared_ptr[1], which calls
+  //    Messenger::AllExternalReferencesDropped. This drops retain_self_
+  //    and results in object destruction.
+  // Option 2): User drops all of its shared_ptr[1] references
+  //  - Though the Reactors still reference the Messenger, AllExternalReferencesDropped
+  //    will get called, which triggers Messenger::Shutdown.
+  //  - AllExternalReferencesDropped drops retain_self_, so the only remaining
+  //    references are from Reactor threads. But the reactor threads are shutting down.
+  //  - When the last Reactor thread dies, there will be no more shared_ptr[1] references
+  //    and the Messenger will be destroyed.
+  //
+  // The main goal of all of this confusion is that the reactor threads need to be able
+  // to shut down asynchronously, and we need to keep the Messenger alive until they
+  // do so. So, handing out a normal shared_ptr to users would force the Messenger
+  // destructor to Join() the reactor threads, which causes a problem if the user
+  // tries to destruct the Messenger from within a Reactor thread itself.
+  std::shared_ptr<Messenger> retain_self_;
+
+  DISALLOW_COPY_AND_ASSIGN(Messenger);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/mt-rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/mt-rpc-test.cc b/be/src/kudu/rpc/mt-rpc-test.cc
new file mode 100644
index 0000000..73e3a13
--- /dev/null
+++ b/be/src/kudu/rpc/mt-rpc-test.cc
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc-test-base.h"
+
+#include <string>
+
+#include <boost/bind.hpp>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/test_util.h"
+
+METRIC_DECLARE_counter(rpc_connections_accepted);
+METRIC_DECLARE_counter(rpcs_queue_overflow);
+
+using std::string;
+using std::shared_ptr;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+class MultiThreadedRpcTest : public RpcTestBase {
+ public:
+  // Make a single RPC call.
+  void SingleCall(Sockaddr server_addr, const char* method_name,
+                  Status* result, CountDownLatch* latch) {
+    LOG(INFO) << "Connecting to " << server_addr.ToString();
+    shared_ptr<Messenger> client_messenger(CreateMessenger("ClientSC"));
+    Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+    *result = DoTestSyncCall(p, method_name);
+    latch->CountDown();
+  }
+
+  // Make RPC calls until we see a failure.
+  void HammerServer(Sockaddr server_addr, const char* method_name,
+                    Status* last_result) {
+    shared_ptr<Messenger> client_messenger(CreateMessenger("ClientHS"));
+    HammerServerWithMessenger(server_addr, method_name, last_result, client_messenger);
+  }
+
+  void HammerServerWithMessenger(
+      Sockaddr server_addr, const char* method_name, Status* last_result,
+      const shared_ptr<Messenger>& messenger) {
+    LOG(INFO) << "Connecting to " << server_addr.ToString();
+    Proxy p(messenger, server_addr, GenericCalculatorService::static_service_name());
+
+    int i = 0;
+    while (true) {
+      i++;
+      Status s = DoTestSyncCall(p, method_name);
+      if (!s.ok()) {
+        // Return on first failure.
+        LOG(INFO) << "Call failed. Shutting down client thread. Ran " << i << " calls: "
+            << s.ToString();
+        *last_result = s;
+        return;
+      }
+    }
+  }
+};
+
+static void AssertShutdown(kudu::Thread* thread, const Status* status) {
+  ASSERT_OK(ThreadJoiner(thread).warn_every_ms(500).Join());
+  string msg = status->ToString();
+  ASSERT_TRUE(msg.find("Service unavailable") != string::npos ||
+              msg.find("Network error") != string::npos)
+              << "Status is actually: " << msg;
+}
+
+// Test making several concurrent RPC calls while shutting down.
+// Simply verify that we don't hit any CHECK errors.
+TEST_F(MultiThreadedRpcTest, TestShutdownDuringService) {
+  // Set up server.
+  Sockaddr server_addr;
+  StartTestServer(&server_addr);
+
+  const int kNumThreads = 4;
+  scoped_refptr<kudu::Thread> threads[kNumThreads];
+  Status statuses[kNumThreads];
+  for (int i = 0; i < kNumThreads; i++) {
+    ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+      &MultiThreadedRpcTest::HammerServer, this, server_addr,
+      GenericCalculatorService::kAddMethodName, &statuses[i], &threads[i]));
+  }
+
+  SleepFor(MonoDelta::FromMilliseconds(50));
+
+  // Shut down server.
+  ASSERT_OK(server_messenger_->UnregisterService(service_name_));
+  service_pool_->Shutdown();
+  server_messenger_->Shutdown();
+
+  for (int i = 0; i < kNumThreads; i++) {
+    AssertShutdown(threads[i].get(), &statuses[i]);
+  }
+}
+
+// Test shutting down the client messenger exactly as a thread is about to start
+// a new connection. This is a regression test for KUDU-104.
+TEST_F(MultiThreadedRpcTest, TestShutdownClientWhileCallsPending) {
+  // Set up server.
+  Sockaddr server_addr;
+  StartTestServer(&server_addr);
+
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
+
+  scoped_refptr<kudu::Thread> thread;
+  Status status;
+  ASSERT_OK(kudu::Thread::Create("test", "test",
+      &MultiThreadedRpcTest::HammerServerWithMessenger, this, server_addr,
+      GenericCalculatorService::kAddMethodName, &status, client_messenger, &thread));
+
+  // Shut down the messenger after a very brief sleep. This often will race so that the
+  // call gets submitted to the messenger before shutdown, but the negotiation won't have
+  // started yet. In a debug build this fails about half the time without the bug fix.
+  // See KUDU-104.
+  SleepFor(MonoDelta::FromMicroseconds(10));
+  client_messenger->Shutdown();
+  client_messenger.reset();
+
+  ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join());
+  ASSERT_TRUE(status.IsAborted() ||
+              status.IsServiceUnavailable());
+  string msg = status.ToString();
+  SCOPED_TRACE(msg);
+  ASSERT_TRUE(msg.find("Client RPC Messenger shutting down") != string::npos ||
+              msg.find("reactor is shutting down") != string::npos ||
+              msg.find("Unable to start connection negotiation thread") != string::npos)
+              << "Status is actually: " << msg;
+}
+
+// This bogus service pool leaves the service queue full.
+class BogusServicePool : public ServicePool {
+ public:
+  BogusServicePool(gscoped_ptr<ServiceIf> service,
+                   const scoped_refptr<MetricEntity>& metric_entity,
+                   size_t service_queue_length)
+    : ServicePool(std::move(service), metric_entity, service_queue_length) {
+  }
+  virtual Status Init(int num_threads) OVERRIDE {
+    // Do nothing
+    return Status::OK();
+  }
+};
+
+void IncrementBackpressureOrShutdown(const Status* status, int* backpressure, int* shutdown) {
+  string msg = status->ToString();
+  if (msg.find("service queue is full") != string::npos) {
+    ++(*backpressure);
+  } else if (msg.find("shutting down") != string::npos) {
+    ++(*shutdown);
+  } else if (msg.find("got EOF from remote") != string::npos) {
+    ++(*shutdown);
+  } else {
+    FAIL() << "Unexpected status message: " << msg;
+  }
+}
+
+// Test that we get a Service Unavailable error when we max out the incoming RPC service queue.
+TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) {
+  const size_t kMaxConcurrency = 2;
+
+  MessengerBuilder bld("messenger1");
+  bld.set_num_reactors(kMaxConcurrency);
+  bld.set_metric_entity(metric_entity_);
+  CHECK_OK(bld.Build(&server_messenger_));
+
+  shared_ptr<AcceptorPool> pool;
+  ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr(), &pool));
+  ASSERT_OK(pool->Start(kMaxConcurrency));
+  Sockaddr server_addr = pool->bind_address();
+
+  gscoped_ptr<ServiceIf> service(new GenericCalculatorService());
+  service_name_ = service->service_name();
+  service_pool_ = new BogusServicePool(std::move(service),
+                                      server_messenger_->metric_entity(),
+                                      kMaxConcurrency);
+  ASSERT_OK(service_pool_->Init(n_worker_threads_));
+  server_messenger_->RegisterService(service_name_, service_pool_);
+
+  scoped_refptr<kudu::Thread> threads[3];
+  Status status[3];
+  CountDownLatch latch(1);
+  for (int i = 0; i < 3; i++) {
+    ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+      &MultiThreadedRpcTest::SingleCall, this, server_addr,
+      GenericCalculatorService::kAddMethodName, &status[i], &latch, &threads[i]));
+  }
+
+  // One should immediately fail due to backpressure. The latch is only initialized
+  // to wait for the first of three threads to finish.
+  latch.Wait();
+
+  // The rest would time out after 10 sec, but we help them along.
+  ASSERT_OK(server_messenger_->UnregisterService(service_name_));
+  service_pool_->Shutdown();
+  server_messenger_->Shutdown();
+
+  for (const auto& thread : threads) {
+    ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join());
+  }
+
+  // Verify that one error was due to backpressure.
+  int errors_backpressure = 0;
+  int errors_shutdown = 0;
+
+  for (const auto& s : status) {
+    IncrementBackpressureOrShutdown(&s, &errors_backpressure, &errors_shutdown);
+  }
+
+  ASSERT_EQ(1, errors_backpressure);
+  ASSERT_EQ(2, errors_shutdown);
+
+  // Check that RPC queue overflow metric is 1
+  Counter *rpcs_queue_overflow =
+    METRIC_rpcs_queue_overflow.Instantiate(server_messenger_->metric_entity()).get();
+  ASSERT_EQ(1, rpcs_queue_overflow->value());
+}
+
+static void HammerServerWithTCPConns(const Sockaddr& addr) {
+  while (true) {
+    Socket socket;
+    CHECK_OK(socket.Init(0));
+    Status s;
+    LOG_SLOW_EXECUTION(INFO, 100, "Connect took long") {
+      s = socket.Connect(addr);
+    }
+    if (!s.ok()) {
+      CHECK(s.IsNetworkError()) << "Unexpected error: " << s.ToString();
+      return;
+    }
+    CHECK_OK(socket.Close());
+  }
+}
+
+// Regression test for KUDU-128.
+// Test that shuts down the server while new TCP connections are incoming.
+TEST_F(MultiThreadedRpcTest, TestShutdownWithIncomingConnections) {
+  // Set up server.
+  Sockaddr server_addr;
+  StartTestServer(&server_addr);
+
+  // Start a number of threads which just hammer the server with TCP connections.
+  vector<scoped_refptr<kudu::Thread> > threads;
+  for (int i = 0; i < 8; i++) {
+    scoped_refptr<kudu::Thread> new_thread;
+    CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+        &HammerServerWithTCPConns, server_addr, &new_thread));
+    threads.push_back(new_thread);
+  }
+
+  // Sleep until the server has started to actually accept some connections from the
+  // test threads.
+  scoped_refptr<Counter> conns_accepted =
+    METRIC_rpc_connections_accepted.Instantiate(server_messenger_->metric_entity());
+  while (conns_accepted->value() == 0) {
+    SleepFor(MonoDelta::FromMicroseconds(100));
+  }
+
+  // Shutdown while there are still new connections appearing.
+  ASSERT_OK(server_messenger_->UnregisterService(service_name_));
+  service_pool_->Shutdown();
+  server_messenger_->Shutdown();
+
+  for (scoped_refptr<kudu::Thread>& t : threads) {
+    ASSERT_OK(ThreadJoiner(t.get()).warn_every_ms(500).Join());
+  }
+}
+
+} // namespace rpc
+} // namespace kudu
+


Mime
View raw message