impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [08/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC library from kudu@314c9d8
Date Thu, 17 Aug 2017 03:14:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_controller.h b/be/src/kudu/rpc/rpc_controller.h
new file mode 100644
index 0000000..ab611a8
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_controller.h
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_CONTROLLER_H
+#define KUDU_RPC_RPC_CONTROLLER_H
+
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+namespace rpc {
+
+class ErrorStatusPB;
+class OutboundCall;
+class RequestIdPB;
+class RpcSidecar;
+
+// Authentication credentials policy for outbound RPCs. Some RPC methods
+// (e.g. MasterService::ConnectToMaster) behave differently depending on the
+// type of credentials used for authentication when establishing the connection.
+// The client expecting some particular results from the call should specify
+// the required policy on a per-call basis using RpcController. By default,
+// RpcController uses ANY_CREDENTIALS.
+enum class CredentialsPolicy {
+  // It's acceptable to use authentication credentials of any type, primary or
+  // secondary ones.
+  ANY_CREDENTIALS,
+
+  // Only primary credentials are acceptable. Primary credentials are Kerberos
+  // tickets, TLS certificate. Secondary credentials are authentication tokens:
+  // they are 'derived' in the sense that it's possible to acquire them using
+  // 'primary' credentials.
+  PRIMARY_CREDENTIALS,
+};
+
+// Controller for managing properties of a single RPC call, on the client side.
+//
+// An RpcController maps to exactly one call and is not thread-safe. The client
+// may use this class prior to sending an RPC in order to set properties such
+// as the call's timeout.
+//
+// After the call has been sent (e.g using Proxy::AsyncRequest()) the user
+// may invoke methods on the RpcController object in order to probe the status
+// of the call.
+class RpcController {
+ public:
+  RpcController();
+  ~RpcController();
+
+  // Swap the state of the controller (including ownership of sidecars, buffers,
+  // etc) with another one.
+  void Swap(RpcController* other);
+
+  // Reset this controller so it may be used with another call.
+  // Note that this resets the required server features.
+  void Reset();
+
+  // Return true if the call has finished.
+  // A call is finished if the server has responded, or if the call
+  // has timed out.
+  bool finished() const;
+
+  // Whether the call failed due to connection negotiation error.
+  bool negotiation_failed() const;
+
+  // Return the current status of a call.
+  //
+  // A call is "OK" status until it finishes, at which point it may
+  // either remain in "OK" status (if the call was successful), or
+  // change to an error status. Error status indicates that there was
+  // some RPC-layer issue with making the call, for example, one of:
+  //
+  // * failed to establish a connection to the server
+  // * the server was too busy to handle the request
+  // * the server was unable to interpret the request (eg due to a version
+  //   mismatch)
+  // * a network error occurred which caused the connection to be torn
+  //   down
+  // * the call timed out
+  Status status() const;
+
+  // If status() returns a RemoteError object, then this function returns
+  // the error response provided by the server. Service implementors may
+  // use protobuf Extensions to add application-specific data to this PB.
+  //
+  // If Status was not a RemoteError, this returns NULL.
+  // The returned pointer is only valid as long as the controller object.
+  const ErrorStatusPB* error_response() const;
+
+  // Set the timeout for the call to be made with this RPC controller.
+  //
+  // The configured timeout applies to the entire time period between
+  // the AsyncRequest() method call and getting a response. For example,
+  // if it takes too long to establish a connection to the remote host,
+  // or to DNS-resolve the remote host, those will be accounted as part
+  // of the timeout period.
+  //
+  // Timeouts must be set prior to making the request -- the timeout may
+  // not currently be adjusted for an already-sent call.
+  //
+  // Using an uninitialized timeout will result in a call which never
+  // times out (not recommended!)
+  void set_timeout(const MonoDelta& timeout);
+
+  // Like a timeout, but based on a fixed point in time instead of a delta.
+  //
+  // Using an uninitialized deadline means the call won't time out.
+  void set_deadline(const MonoTime& deadline);
+
+  // Allows setting the request id for the next request sent to the server.
+  // A request id allows the server to identify each request sent by the client uniquely,
+  // in some cases even when sent to multiple servers, enabling exactly once semantics.
+  void SetRequestIdPB(std::unique_ptr<RequestIdPB> request_id);
+
+  // Returns whether a request id has been set on RPC header.
+  bool has_request_id() const;
+
+  // Returns the currently set request id.
+  // When the request is sent to the server, it gets "moved" from RpcController
+  // so an absence of a request after send doesn't mean one wasn't sent.
+  // REQUIRES: the controller has a request ID set.
+  const RequestIdPB& request_id() const;
+
+  // Add a requirement that the server side must support a feature with the
+  // given identifier. The set of required features is sent to the server
+  // with the RPC call, and if any required feature is not supported, the
+  // call will fail with a NotSupported() status.
+  //
+  // This can be used when an RPC call changes in a way that is protobuf-compatible,
+  // but for which it would not be appropriate for the server to simply ignore
+  // an added field. For example, consider an API call like:
+  //
+  //   message DeleteAccount {
+  //     optional string username = 1;
+  //     optional bool dry_run = 2; // ADDED LATER!
+  //   }
+  //
+  // In this case, if a new client which supports the 'dry_run' flag sends the RPC
+  // to an old server, the old server will simply ignore the unrecognized parameter,
+  // with highly problematic results. To solve this problem, the new version can
+  // add a feature flag:
+  //
+  //   In .proto file
+  //   ----------------
+  //   enum MyFeatureFlags {
+  //     UNKNOWN = 0;
+  //     DELETE_ACCOUNT_SUPPORTS_DRY_RUN = 1;
+  //   }
+  //
+  //   In client code:
+  //   ---------------
+  //   if (dry_run) {
+  //     rpc.RequireServerFeature(DELETE_ACCOUNT_SUPPORTS_DRY_RUN);
+  //     req.set_dry_run(true);
+  //   }
+  //
+  // This has the effect of (a) maintaining compatibility when dry_run is not specified
+  // and (b) rejecting the RPC with a "NotSupported" error when it is.
+  //
+  // NOTE: 'feature' is an int rather than an enum type because each service
+  // must define its own enum of supported features, and protobuf doesn't support
+  // any ability to 'extend' enum types. Implementers should define an enum in the
+  // service's protobuf definition as shown above.
+  void RequireServerFeature(uint32_t feature);
+
+  // Executes the provided function with a reference to the required server
+  // features.
+  const std::unordered_set<uint32_t>& required_server_features() const {
+    return required_server_features_;
+  }
+
+  // Return the configured timeout.
+  MonoDelta timeout() const;
+
+  CredentialsPolicy credentials_policy() const {
+    return credentials_policy_;
+  }
+
+  void set_credentials_policy(CredentialsPolicy policy) {
+    credentials_policy_ = policy;
+  }
+
+  // Fills the 'sidecar' parameter with the slice pointing to the i-th
+  // sidecar upon success.
+  //
+  // Should only be called if the call's finished, but the controller has not
+  // been Reset().
+  //
+  // May fail if index is invalid.
+  Status GetInboundSidecar(int idx, Slice* sidecar) const;
+
+  // Adds a sidecar to the outbound request. The index of the sidecar is written to
+  // 'idx'. Returns an error if TransferLimits::kMaxSidecars have already been added
+  // to this request.
+  Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
+
+ private:
+  friend class OutboundCall;
+  friend class Proxy;
+
+  // Set the outbound call_'s request parameter, and transfer ownership of
+  // outbound_sidecars_ to call_ in preparation for serialization.
+  void SetRequestParam(const google::protobuf::Message& req);
+
+  MonoDelta timeout_;
+  std::unordered_set<uint32_t> required_server_features_;
+
+  // RPC authentication policy for outbound calls.
+  CredentialsPolicy credentials_policy_;
+
+  mutable simple_spinlock lock_;
+
+  // The id of this request.
+  // Ownership is transferred to OutboundCall once the call is sent.
+  std::unique_ptr<RequestIdPB> request_id_;
+
+  // Once the call is sent, it is tracked here.
+  std::shared_ptr<OutboundCall> call_;
+
+  std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
+
+  DISALLOW_COPY_AND_ASSIGN(RpcController);
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_header.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_header.proto b/be/src/kudu/rpc/rpc_header.proto
new file mode 100644
index 0000000..1d55b6a
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_header.proto
@@ -0,0 +1,365 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+
+option optimize_for = SPEED;
+
+package kudu.rpc;
+
+option java_package = "org.apache.kudu.rpc";
+
+import "google/protobuf/descriptor.proto";
+import "kudu/security/token.proto";
+import "kudu/util/pb_util.proto";
+
+// The Kudu RPC protocol is similar to the RPC protocol of Hadoop and HBase.
+// See the following for reference on those other protocols:
+//  - https://issues.apache.org/jira/browse/HBASE-7898
+//  - https://issues.apache.org/jira/browse/HADOOP-8990
+//
+// For a description of the Kudu protocol, see 'README' in this directory.
+
+// User Information proto.  Included in ConnectionContextPB on connection setup.
+message UserInformationPB {
+  optional string effective_user = 1;
+  required string real_user = 2;
+}
+
+// The connection context is sent as part of the connection establishment.
+// It establishes the context for ALL RPC calls within the connection.
+// This is sent on connection setup after the connection preamble is sent
+// and SASL has been negotiated.
+// No response is sent from the server to the client.
+message ConnectionContextPB {
+  // UserInfo beyond what is determined as part of security handshake
+  // at connection time (kerberos, tokens etc).
+  //
+  // DEPRECATED: No longer used in Kudu 1.1 and later.
+  // The 'real_user' should be taken from the SASL negotiation.
+  // Impersonation (effective user) was never supported, so we'll have
+  // to add that back at some point later.
+  optional UserInformationPB DEPRECATED_user_info = 2;
+
+  // If the server sends a nonce to the client during the SASL_SUCCESS
+  // negotiation step, the client is required to encode it with SASL integrity
+  // protection and return it in this field. The nonce protects the server
+  // against a Kerberos replay attack.
+  optional bytes encoded_nonce = 3 [(REDACT) = true];
+}
+
+// Features supported by the RPC system itself.
+//
+// Note that this should be used to evolve the RPC _system_, not the semantics
+// or compatibility of individual calls.
+//
+// For example, if we were to add a feature like call or response wire
+// compression in the future, we could add a flag here to indicate that the
+// client or server supports that feature. Optional features which may safely be
+// ignored by the receiver do not need a feature flag, instead the optional
+// field feature of ProtoBuf may be utilized.
+enum RpcFeatureFlag {
+  UNKNOWN = 0;
+
+  // The RPC system is required to support application feature flags in the
+  // request and response headers.
+  APPLICATION_FEATURE_FLAGS = 1;
+
+  // The RPC system supports TLS protected connections. If both sides support
+  // this flag, the connection will automatically be wrapped in a TLS protected
+  // channel following a TLS handshake.
+  TLS = 2;
+
+  // If both sides advertise TLS_AUTHENTICATION_ONLY, this means that they
+  // agree that, after handshaking TLS, they will *not* wrap the connection
+  // in a TLS-protected channel. Instead, they will use TLS only for its
+  // handshake-based authentication.
+  //
+  // This is currently used for loopback connections only, so that compute
+  // frameworks which schedule for locality don't pay encryption overhead.
+  TLS_AUTHENTICATION_ONLY = 3;
+};
+
+// An authentication type. This is modeled as a oneof in case any of these
+// authentication types, or any authentication types in the future, need to add
+// extra type-specific parameters during negotiation.
+message AuthenticationTypePB {
+  message Sasl {};
+  message Token {};
+  message Certificate {};
+
+  oneof type {
+    // The server and client mutually authenticate via SASL.
+    Sasl sasl = 1;
+
+    // The server authenticates the client via a signed token, and the client
+    // authenticates the server by verifying its certificate has been signed by
+    // a trusted CA.
+    //
+    // Token authentication requires the connection to be TLS encrypted.
+    Token token = 2;
+
+    // The server and client mutually authenticate by certificate.
+    //
+    // Certificate authentication requires the connection to be TLS encrypted.
+    Certificate certificate = 3;
+  }
+}
+
+// Message type passed back & forth for the SASL negotiation.
+message NegotiatePB {
+  enum NegotiateStep {
+    UNKNOWN        = 999;
+    NEGOTIATE      = 1;
+    SASL_SUCCESS   = 0;
+    SASL_INITIATE  = 2;
+    SASL_CHALLENGE = 3;
+    SASL_RESPONSE  = 4;
+    TLS_HANDSHAKE  = 5;
+    TOKEN_EXCHANGE = 6;
+  }
+
+  message SaslMechanism {
+    // The SASL mechanism, i.e. 'PLAIN' or 'GSSAPI'.
+    required string mechanism = 2;
+
+    // Deprecated: no longer used.
+    // optional string method = 1;
+    // optional bytes challenge = 5 [(REDACT) = true];
+  }
+
+  // When the client sends its NEGOTIATE step message, it sends its set of
+  // supported RPC system features. In the response to this message, the server
+  // sends back its own. This allows the two peers to agree on whether newer
+  // extensions of the RPC system may be used on this connection. We use a list
+  // of features rather than a simple version number to make it easier for the
+  // Java and C++ clients to implement features in different orders while still
+  // maintaining compatibility, as well as to simplify backporting of features
+  // out-of-order.
+  repeated RpcFeatureFlag supported_features = 1;
+
+  // The current negotiation step.
+  required NegotiateStep step  = 2;
+
+  // The SASL token, containing either the challenge during the SASL_CHALLENGE
+  // step, or the response during the SASL_RESPONSE step.
+  optional bytes token         = 3 [(REDACT) = true];
+
+  // During the TLS_HANDSHAKE step, contains the TLS handshake message.
+  optional bytes tls_handshake = 5 [(REDACT) = true];
+
+  // The tls-server-end-point channel bindings as specified in RFC 5929.  Sent
+  // from the server to the client during the SASL_SUCCESS step when the
+  // Kerberos (GSSAPI) SASL mechanism is used with TLS, in order to bind the
+  // Kerberos authenticated channel to the TLS channel. The value is integrity
+  // protected through SASL. The client is responsible for validating that the
+  // value matches the expected value.
+  optional bytes channel_bindings = 6 [(REDACT) = true];
+
+  // A random nonce sent from the server to the client during the SASL_SUCCESS
+  // step when the Kerberos (GSSAPI) SASL mechanism is used with TLS. The nonce
+  // must be sent back to the server, wrapped in SASL integrity protection, as
+  // part of the connection context.
+  optional bytes nonce = 9 [(REDACT) = true];
+
+  // During the NEGOTIATE step, contains the supported SASL mechanisms.
+  // During the SASL_INITIATE step, contains the single chosen SASL mechanism.
+  repeated SaslMechanism sasl_mechanisms = 4;
+
+  // During the client to server NEGOTIATE step, contains the supported authentication types.
+  // During the server to client NEGOTIATE step, contains the chosen authentication type.
+  repeated AuthenticationTypePB authn_types = 7;
+
+  // During the TOKEN_EXCHANGE step, contains the client's signed authentication token.
+  optional security.SignedTokenPB authn_token = 8;
+}
+
+message RemoteMethodPB {
+  // Service name for the RPC layer.
+  // The client created a proxy with this service name.
+  // Example: kudu.rpc_test.CalculatorService
+  required string service_name = 1;
+
+  // Name of the RPC method.
+  required string method_name = 2;
+};
+
+// The Id of a retriable RPC, whose results should be tracked on the server (see result_tracker.h).
+// This also includes some information that is useful for execution/garbage collection.
+message RequestIdPB {
+  // The (globally unique) id of the client performing this RPC.
+  required string client_id = 1;
+
+  // The (per-client unique) sequence number of this RPC.
+  required int64 seq_no = 2;
+
+  // The sequence number of the first RPC that has not been marked as completed by the client.
+  // Unset if there isn't an incomplete RPC.
+  required int64 first_incomplete_seq_no = 3;
+
+  // The number of times this RPC has been tried.
+  // Set to 1 in the first attempt.
+  required int64 attempt_no = 4;
+}
+
+// The header for the RPC request frame.
+message RequestHeader {
+  // A sequence number that uniquely identifies a call to a single remote server. This number is
+  // sent back in the Response and allows to match it to the original Request.
+  // Hadoop specifies a uint32 and casts it to a signed int. That is counterintuitive, so we use an
+  // int32 instead. Allowed values (inherited from Hadoop):
+  //   0 through INT32_MAX: Regular RPC call IDs.
+  //   -2: Invalid call ID.
+  //   -3: Connection context call ID.
+  //   -33: SASL negotiation call ID.
+  //
+  // NOTE: these calls must be increasing but may have gaps.
+  required int32 call_id = 3;
+
+  // RPC method being invoked.
+  // Not used for "connection setup" calls.
+  optional RemoteMethodPB remote_method = 6;
+
+  // Propagate the timeout as specified by the user. Note that, since there is some
+  // transit time between the client and server, if you wait exactly this amount of
+  // time and then respond, you are likely to cause a timeout on the client.
+  optional uint32 timeout_millis = 10;
+
+  // Feature flags that the service must support in order to properly interpret this
+  // request. The client can pass any set of flags, and if the server doesn't
+  // support any of them, then it will fail the request.
+  //
+  // NOTE: these are for evolving features at the level of the application, not
+  // the RPC framework. Hence, we have to use a generic int type rather than a
+  // particular enum.
+  // NOTE: the server will only interpret this field if it supports the
+  // APPLICATION_FEATURE_FLAGS flag.
+  repeated uint32 required_feature_flags = 11;
+
+  // The unique id of this request, if it's retriable and if the results are to be tracked.
+  // The request id is unique per logical request, i.e. retries of the same RPC must have the
+  // same request id.
+  // Note that this is different from 'call_id' in that a call_id is unique to a server while a
+  // request_id is unique to a logical request (i.e. the request_id remains the same when a request
+  // is retried on a different server).
+  // Optional for requests that are naturally idempotent or to maintain compatibility with
+  // older clients for requests that are not.
+  optional RequestIdPB request_id = 15;
+
+  // Byte offsets for side cars in the main body of the request message.
+  // These offsets are counted AFTER the message header, i.e., offset 0
+  // is the first byte after the bytes for this protobuf.
+  repeated uint32 sidecar_offsets = 16;
+}
+
+message ResponseHeader {
+  required int32 call_id = 1;
+
+  // If this is set, then this is an error response and the
+  // response message will be of type ErrorStatusPB instead of
+  // the expected response type.
+  optional bool is_error = 2 [ default = false ];
+
+  // Byte offsets for side cars in the main body of the response message.
+  // These offsets are counted AFTER the message header, i.e., offset 0
+  // is the first byte after the bytes for this protobuf.
+  repeated uint32 sidecar_offsets = 3;
+}
+
+// Sent as response when is_error == true.
+message ErrorStatusPB {
+
+  // These codes have all been inherited from Hadoop's RPC mechanism.
+  enum RpcErrorCodePB {
+    FATAL_UNKNOWN = 10;
+
+    // Non-fatal RPC errors. Connection should be left open for future RPC calls.
+    //------------------------------------------------------------
+    // The application generated an error status. See the message field for
+    // more details.
+    ERROR_APPLICATION = 1;
+
+    // The specified method was not valid.
+    ERROR_NO_SUCH_METHOD = 2;
+
+    // The specified service was not valid.
+    ERROR_NO_SUCH_SERVICE = 3;
+
+    // The server is overloaded - the client should try again shortly.
+    ERROR_SERVER_TOO_BUSY = 4;
+
+    // The request parameter was not parseable, was missing required fields,
+    // or the server does not support the required feature flags.
+    ERROR_INVALID_REQUEST = 5;
+
+    // The server might have previously received this request but its response is no
+    // longer cached. It's unknown whether the request was executed or not.
+    ERROR_REQUEST_STALE = 6;
+
+    // The server is not able to complete the connection or request at this
+    // time. The client may try again later.
+    ERROR_UNAVAILABLE = 7;
+
+    // FATAL_* errors indicate that the client should shut down the connection.
+    //------------------------------------------------------------
+    // The RPC server is already shutting down.
+    FATAL_SERVER_SHUTTING_DOWN = 11;
+    // Fields of RpcHeader are invalid.
+    FATAL_INVALID_RPC_HEADER = 12;
+    // Could not deserialize RPC request.
+    FATAL_DESERIALIZING_REQUEST = 13;
+    // IPC Layer version mismatch.
+    FATAL_VERSION_MISMATCH = 14;
+    // Auth failed.
+    FATAL_UNAUTHORIZED = 15;
+
+    // The authentication token is invalid or expired;
+    // the client should obtain a new one.
+    FATAL_INVALID_AUTHENTICATION_TOKEN = 16;
+  }
+
+  required string message = 1;
+
+  // TODO: Make code required?
+  optional RpcErrorCodePB code = 2;  // Specific error identifier.
+
+  // If the request is failed due to an unsupported feature flag, the particular
+  // flag(s) that were not supported will be sent back to the client.
+  repeated uint32 unsupported_feature_flags = 3;
+
+  // Allow extensions. When the RPC returns ERROR_APPLICATION, the server
+  // should also fill in exactly one of these extension fields, which contains
+  // more details on the service-specific error.
+  extensions 100 to max;
+}
+
+extend google.protobuf.MethodOptions {
+  // An option for RPC methods that allows to set whether that method's
+  // RPC results should be tracked with a ResultTracker.
+  optional bool track_rpc_result = 50006 [default=false];
+
+  // An option to set the authorization method for this particular
+  // RPC method. If this is not specified, the service's 'default_authz_method'
+  // is used.
+  optional string authz_method = 50007;
+}
+
+extend google.protobuf.ServiceOptions {
+  // Set the default authorization method for the RPCs in this service.
+  // If this is not set, then the default authorization is to allow all
+  // RPCs.
+  optional string default_authz_method = 50007;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_introspection.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_introspection.proto b/be/src/kudu/rpc/rpc_introspection.proto
new file mode 100644
index 0000000..9d2f9b5
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_introspection.proto
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Protobuf used for introspection of RPC services (eg listing in-flight RPCs,
+// reflection, etc)
+syntax = "proto2";
+
+package kudu.rpc;
+
+option java_package = "org.apache.kudu";
+
+import "kudu/rpc/rpc_header.proto";
+
+message RpcCallInProgressPB {
+  required RequestHeader header = 1;
+  optional string trace_buffer = 2;
+  optional uint64 micros_elapsed = 3;
+
+  enum State {
+    UNKNOWN = 999;
+
+    // States for OutboundCall
+    ON_OUTBOUND_QUEUE = 1;
+    SENDING = 2;
+    SENT = 3;
+    TIMED_OUT = 4;
+    FINISHED_ERROR = 5;
+    FINISHED_SUCCESS = 6;
+    NEGOTIATION_TIMED_OUT = 7;
+    FINISHED_NEGOTIATION_ERROR = 8;
+
+    // TODO(todd): add states for InboundCall
+  }
+
+  optional State state = 4;
+}
+
+message RpcConnectionPB {
+  enum StateType {
+    UNKNOWN = 999;
+    NEGOTIATING = 0;  // Connection is still being negotiated.
+    OPEN = 1;         // Connection is active.
+  };
+
+  required string remote_ip = 1;
+  required StateType state = 2;
+  // TODO: swap out for separate fields
+  optional string remote_user_credentials = 3;
+  repeated RpcCallInProgressPB calls_in_flight = 4;
+}
+
+message DumpRunningRpcsRequestPB {
+  optional bool include_traces = 1 [ default = false ];
+}
+
+message DumpRunningRpcsResponsePB {
+  repeated RpcConnectionPB inbound_connections = 1;
+  repeated RpcConnectionPB outbound_connections = 2;
+}
+
+//------------------------------------------------------------
+
+// A particular TraceMetric key/value pair from a sampled RPC.
+message TraceMetricPB {
+  // A '.'-separated path through the parent-child trace hierarchy.
+  optional string child_path = 1;
+  optional string key = 2;
+  optional int64 value = 3;
+}
+
+// A single sampled RPC call.
+message RpczSamplePB {
+  // The original request header.
+  optional RequestHeader header = 1;
+  // The stringified request trace.
+  optional string trace = 2;
+  // The number of millis that this call took to complete.
+  optional int32 duration_ms = 3;
+  // The metrics from the sampled trace.
+  repeated TraceMetricPB metrics = 4;
+}
+
+// A set of samples for a particular RPC method.
+message RpczMethodPB {
+  required string method_name = 1;
+  repeated RpczSamplePB samples = 2;
+}
+
+// Request and response for dumping previously sampled RPC calls.
+message DumpRpczStoreRequestPB {
+}
+message DumpRpczStoreResponsePB {
+  repeated RpczMethodPB methods = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_service.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_service.h b/be/src/kudu/rpc/rpc_service.h
new file mode 100644
index 0000000..dcaa9c1
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_service.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_SERVICE_H_
+#define KUDU_RPC_SERVICE_H_
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+class RemoteMethod;
+struct RpcMethodInfo;
+class InboundCall;
+
+class RpcService : public RefCountedThreadSafe<RpcService> {
+ public:
+  virtual ~RpcService() {}
+
+  // Enqueue a call for processing.
+  // On failure, the RpcService::QueueInboundCall() implementation is
+  // responsible for responding to the client with a failure message.
+  virtual Status QueueInboundCall(gscoped_ptr<InboundCall> call) = 0;
+
+  virtual RpcMethodInfo* LookupMethod(const RemoteMethod& method) {
+    return nullptr;
+  }
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_SERVICE_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_sidecar.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_sidecar.cc b/be/src/kudu/rpc/rpc_sidecar.cc
new file mode 100644
index 0000000..580c6eb
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_sidecar.cc
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_sidecar.h"
+
+#include "kudu/util/status.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/gutil/strings/substitute.h"
+
+using std::unique_ptr;
+
+namespace kudu {
+namespace rpc {
+
+// Sidecar that simply wraps a Slice. The data associated with the slice is therefore not
+// owned by this class, and it's the caller's responsibility to ensure it has a lifetime
+// at least as long as this sidecar.
+class SliceSidecar : public RpcSidecar {
+ public:
+  explicit SliceSidecar(Slice slice) : slice_(slice) { }
+  Slice AsSlice() const override { return slice_; }
+
+ private:
+  const Slice slice_;
+};
+
+class FaststringSidecar : public RpcSidecar {
+ public:
+  explicit FaststringSidecar(unique_ptr<faststring> data) : data_(std::move(data)) { }
+  Slice AsSlice() const override { return *data_; }
+
+ private:
+  const unique_ptr<faststring> data_;
+};
+
+unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(unique_ptr<faststring> data) {
+  return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data)));
+}
+
+unique_ptr<RpcSidecar> RpcSidecar::FromSlice(Slice slice) {
+  return unique_ptr<RpcSidecar>(new SliceSidecar(slice));
+}
+
+
+Status RpcSidecar::ParseSidecars(
+    const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
+    Slice buffer, Slice* sidecars) {
+  if (offsets.size() == 0) return Status::OK();
+
+  int last = offsets.size() - 1;
+  if (last >= TransferLimits::kMaxSidecars) {
+    return Status::Corruption(strings::Substitute(
+            "Received $0 additional payload slices, expected at most %d",
+            last, TransferLimits::kMaxSidecars));
+  }
+
+  for (int i = 0; i < last; ++i) {
+    int64_t cur_offset = offsets.Get(i);
+    int64_t next_offset = offsets.Get(i + 1);
+    if (next_offset > buffer.size()) {
+      return Status::Corruption(strings::Substitute(
+              "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
+              " has length $2, but the entire message has length $3",
+              i, cur_offset, (next_offset - cur_offset), buffer.size()));
+    }
+    if (next_offset < cur_offset) {
+      return Status::Corruption(strings::Substitute(
+              "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
+              " but ends before that at offset $1.", i, cur_offset, next_offset));
+    }
+
+    sidecars[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset);
+  }
+
+  int64_t cur_offset = offsets.Get(last);
+  if (cur_offset > buffer.size()) {
+    return Status::Corruption(strings::Substitute("Invalid sidecar offsets: sidecar $0 "
+            "starts at offset $1after message ends (message length $2).", last,
+            cur_offset, buffer.size()));
+  }
+  sidecars[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset);
+
+  return Status::OK();
+}
+
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_sidecar.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_sidecar.h b/be/src/kudu/rpc/rpc_sidecar.h
new file mode 100644
index 0000000..00d6e4b
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_sidecar.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_SIDECAR_H
+#define KUDU_RPC_RPC_SIDECAR_H
+
+#include <google/protobuf/repeated_field.h>
+#include <memory>
+
+#include "kudu/util/faststring.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+namespace rpc {
+
+// An RpcSidecar is a mechanism which allows replies to RPCs to reference blocks of data
+// without extra copies. In other words, whenever a protobuf would have a large field
+// where additional copies become expensive, one may opt instead to use an RpcSidecar.
+//
+// The RpcSidecar saves on an additional copy to/from the protobuf on both the server and
+// client side. Both Inbound- and OutboundCall classes accept sidecars to be sent to the
+// client and server respectively. They are ignorant of the sidecar's format, requiring
+// only that it can be represented as a Slice. Data is copied from the Slice returned from
+// AsSlice() to the socket that is responding to the original RPC. The slice should remain
+// valid for as long as the call it is attached to takes to complete.
+//
+// In order to distinguish between separate sidecars, whenever a sidecar is added to the
+// RPC response on the server side, an index for that sidecar is returned. This index must
+// then in some way (i.e., via protobuf) be communicated to the recipient.
+//
+// After reconstructing the array of sidecars, servers and clients may retrieve the
+// sidecar data through the RpcContext or RpcController interfaces respectively.
+class RpcSidecar {
+ public:
+  static std::unique_ptr<RpcSidecar> FromFaststring(std::unique_ptr<faststring> data);
+  static std::unique_ptr<RpcSidecar> FromSlice(Slice slice);
+
+  // Utility method to parse a series of sidecar slices into 'sidecars' from 'buffer' and
+  // a set of offsets. 'sidecars' must have length >= TransferLimits::kMaxSidecars, and
+  // will be filled from index 0.
+  // TODO(henryr): Consider a vector instead here if there's no perf. impact.
+  static Status ParseSidecars(
+      const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
+      Slice buffer, Slice* sidecars);
+
+  // Returns a Slice representation of the sidecar's data.
+  virtual Slice AsSlice() const = 0;
+  virtual ~RpcSidecar() { }
+};
+
+} // namespace rpc
+} // namespace kudu
+
+
+#endif /* KUDU_RPC_RPC_SIDECAR_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_stub-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_stub-test.cc b/be/src/kudu/rpc/rpc_stub-test.cc
new file mode 100644
index 0000000..2fe0708
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_stub-test.cc
@@ -0,0 +1,679 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <memory>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <glog/stl_logging.h>
+#include <gtest/gtest.h>
+#include <boost/bind.hpp>
+
+#include "kudu/gutil/stl_util.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpcz_store.h"
+#include "kudu/rpc/rtest.proxy.h"
+#include "kudu/rpc/rtest.service.h"
+#include "kudu/rpc/rpc-test-base.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/subprocess.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/user.h"
+
+DEFINE_bool(is_panic_test_child, false, "Used by TestRpcPanic");
+DECLARE_bool(socket_inject_short_recvs);
+
+using std::shared_ptr;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+namespace rpc {
+
+class RpcStubTest : public RpcTestBase {
+ public:
+  virtual void SetUp() OVERRIDE {
+    RpcTestBase::SetUp();
+    // Use a shorter queue length since some tests below need to start enough
+    // threads to saturate the queue.
+    service_queue_length_ = 10;
+    StartTestServerWithGeneratedCode(&server_addr_);
+    client_messenger_ = CreateMessenger("Client");
+  }
+ protected:
+  void SendSimpleCall() {
+    CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+    RpcController controller;
+    AddRequestPB req;
+    req.set_x(10);
+    req.set_y(20);
+    AddResponsePB resp;
+    ASSERT_OK(p.Add(req, &resp, &controller));
+    ASSERT_EQ(30, resp.result());
+  }
+
+  Sockaddr server_addr_;
+  shared_ptr<Messenger> client_messenger_;
+};
+
+TEST_F(RpcStubTest, TestSimpleCall) {
+  SendSimpleCall();
+}
+
+// Regression test for a bug in which we would not properly parse a call
+// response when recv() returned a 'short read'. This injects such short
+// reads and then makes a number of calls.
+TEST_F(RpcStubTest, TestShortRecvs) {
+  FLAGS_socket_inject_short_recvs = true;
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  for (int i = 0; i < 100; i++) {
+    NO_FATALS(SendSimpleCall());
+  }
+}
+
+// Test calls which are rather large.
+// This test sends many of them at once using the async API and then
+// waits for them all to return. This is meant to ensure that the
+// IO threads can deal with read/write calls that don't succeed
+// in sending the entire data in one go.
+TEST_F(RpcStubTest, TestBigCallData) {
+  const int kNumSentAtOnce = 20;
+  const size_t kMessageSize = 5 * 1024 * 1024;
+  string data;
+  data.resize(kMessageSize);
+
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  EchoRequestPB req;
+  req.set_data(data);
+
+  vector<unique_ptr<EchoResponsePB>> resps;
+  vector<unique_ptr<RpcController>> controllers;
+
+  CountDownLatch latch(kNumSentAtOnce);
+  for (int i = 0; i < kNumSentAtOnce; i++) {
+    resps.emplace_back(new EchoResponsePB);
+    controllers.emplace_back(new RpcController);
+
+    p.EchoAsync(req, resps.back().get(), controllers.back().get(),
+                boost::bind(&CountDownLatch::CountDown, boost::ref(latch)));
+  }
+
+  latch.Wait();
+
+  for (const auto& c : controllers) {
+    ASSERT_OK(c->status());
+  }
+}
+
+TEST_F(RpcStubTest, TestRespondDeferred) {
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  RpcController controller;
+  SleepRequestPB req;
+  req.set_sleep_micros(1000);
+  req.set_deferred(true);
+  SleepResponsePB resp;
+  ASSERT_OK(p.Sleep(req, &resp, &controller));
+}
+
+// Test that the default user credentials are propagated to the server.
+TEST_F(RpcStubTest, TestDefaultCredentialsPropagated) {
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  string expected;
+  ASSERT_OK(GetLoggedInUser(&expected));
+
+  RpcController controller;
+  WhoAmIRequestPB req;
+  WhoAmIResponsePB resp;
+  ASSERT_OK(p.WhoAmI(req, &resp, &controller));
+  ASSERT_EQ(expected, resp.credentials().real_user());
+  ASSERT_FALSE(resp.credentials().has_effective_user());
+}
+
+// Test that the user can specify other credentials.
+TEST_F(RpcStubTest, TestCustomCredentialsPropagated) {
+  const char* const kFakeUserName = "some fake user";
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  UserCredentials creds;
+  creds.set_real_user(kFakeUserName);
+  p.set_user_credentials(creds);
+
+  RpcController controller;
+  WhoAmIRequestPB req;
+  WhoAmIResponsePB resp;
+  ASSERT_OK(p.WhoAmI(req, &resp, &controller));
+  ASSERT_EQ(kFakeUserName, resp.credentials().real_user());
+  ASSERT_FALSE(resp.credentials().has_effective_user());
+}
+
+TEST_F(RpcStubTest, TestAuthorization) {
+  // First test calling WhoAmI() as user "alice", who is disallowed.
+  {
+    CalculatorServiceProxy p(client_messenger_, server_addr_);
+    UserCredentials creds;
+    creds.set_real_user("alice");
+    p.set_user_credentials(creds);
+
+    // Alice is disallowed by all RPCs.
+    RpcController controller;
+    WhoAmIRequestPB req;
+    WhoAmIResponsePB resp;
+    Status s = p.WhoAmI(req, &resp, &controller);
+    ASSERT_FALSE(s.ok());
+    ASSERT_EQ(s.ToString(),
+              "Remote error: Not authorized: alice is not allowed to call this method");
+  }
+
+  // Try some calls as "bob".
+  {
+    CalculatorServiceProxy p(client_messenger_, server_addr_);
+    UserCredentials creds;
+    creds.set_real_user("bob");
+    p.set_user_credentials(creds);
+
+    // "bob" is allowed to call WhoAmI().
+    {
+      RpcController controller;
+      WhoAmIRequestPB req;
+      WhoAmIResponsePB resp;
+      ASSERT_OK(p.WhoAmI(req, &resp, &controller));
+    }
+
+    // "bob" is not allowed to call "Sleep".
+    {
+      RpcController controller;
+      SleepRequestPB req;
+      req.set_sleep_micros(10);
+      SleepResponsePB resp;
+      Status s = p.Sleep(req, &resp, &controller);
+      ASSERT_EQ(s.ToString(),
+                "Remote error: Not authorized: bob is not allowed to call this method");
+    }
+  }
+}
+
+// Test that the user's remote address is accessible to the server.
+TEST_F(RpcStubTest, TestRemoteAddress) {
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  RpcController controller;
+  WhoAmIRequestPB req;
+  WhoAmIResponsePB resp;
+  ASSERT_OK(p.WhoAmI(req, &resp, &controller));
+  ASSERT_STR_CONTAINS(resp.address(), "127.0.0.1:");
+}
+
+////////////////////////////////////////////////////////////
+// Tests for error cases
+////////////////////////////////////////////////////////////
+
+// Test sending a PB parameter with a missing field, where the client
+// thinks it has sent a full PB. (eg due to version mismatch)
+TEST_F(RpcStubTest, TestCallWithInvalidParam) {
+  Proxy p(client_messenger_, server_addr_, CalculatorService::static_service_name());
+
+  rpc_test::AddRequestPartialPB req;
+  req.set_x(rand());
+  // AddRequestPartialPB is missing the 'y' field.
+  AddResponsePB resp;
+  RpcController controller;
+  Status s = p.SyncRequest("Add", req, &resp, &controller);
+  ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "Invalid argument: invalid parameter for call "
+                      "kudu.rpc_test.CalculatorService.Add: "
+                      "missing fields: y");
+}
+
+// Wrapper around AtomicIncrement, since AtomicIncrement returns the 'old'
+// value, and our callback needs to be a void function.
+static void DoIncrement(Atomic32* count) {
+  base::subtle::Barrier_AtomicIncrement(count, 1);
+}
+
+// Test sending a PB parameter with a missing field on the client side.
+// This also ensures that the async callback is only called once
+// (regression test for a previously-encountered bug).
+TEST_F(RpcStubTest, TestCallWithMissingPBFieldClientSide) {
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  RpcController controller;
+  AddRequestPB req;
+  req.set_x(10);
+  // Request is missing the 'y' field.
+  AddResponsePB resp;
+  Atomic32 callback_count = 0;
+  p.AddAsync(req, &resp, &controller, boost::bind(&DoIncrement, &callback_count));
+  while (NoBarrier_Load(&callback_count) == 0) {
+    SleepFor(MonoDelta::FromMicroseconds(10));
+  }
+  SleepFor(MonoDelta::FromMicroseconds(100));
+  ASSERT_EQ(1, NoBarrier_Load(&callback_count));
+  ASSERT_STR_CONTAINS(controller.status().ToString(),
+                      "Invalid argument: invalid parameter for call "
+                      "kudu.rpc_test.CalculatorService.Add: missing fields: y");
+}
+
+TEST_F(RpcStubTest, TestResponseWithMissingField) {
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  RpcController rpc;
+  TestInvalidResponseRequestPB req;
+  TestInvalidResponseResponsePB resp;
+  req.set_error_type(rpc_test::TestInvalidResponseRequestPB_ErrorType_MISSING_REQUIRED_FIELD);
+  Status s = p.TestInvalidResponse(req, &resp, &rpc);
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "invalid RPC response, missing fields: response");
+}
+
+// Test case where the server responds with a message which is larger than the maximum
+// configured RPC message size. The server should send the response, but the client
+// will reject it.
+TEST_F(RpcStubTest, TestResponseLargerThanFrameSize) {
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  RpcController rpc;
+  TestInvalidResponseRequestPB req;
+  TestInvalidResponseResponsePB resp;
+  req.set_error_type(rpc_test::TestInvalidResponseRequestPB_ErrorType_RESPONSE_TOO_LARGE);
+  Status s = p.TestInvalidResponse(req, &resp, &rpc);
+  ASSERT_STR_CONTAINS(s.ToString(), "Network error: RPC frame had a length of");
+}
+
+// Test sending a call which isn't implemented by the server.
+TEST_F(RpcStubTest, TestCallMissingMethod) {
+  Proxy p(client_messenger_, server_addr_, CalculatorService::static_service_name());
+
+  Status s = DoTestSyncCall(p, "DoesNotExist");
+  ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "with an invalid method name: DoesNotExist");
+}
+
+TEST_F(RpcStubTest, TestApplicationError) {
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  RpcController controller;
+  SleepRequestPB req;
+  SleepResponsePB resp;
+  req.set_sleep_micros(1);
+  req.set_return_app_error(true);
+  Status s = p.Sleep(req, &resp, &controller);
+  ASSERT_TRUE(s.IsRemoteError());
+  EXPECT_EQ("Remote error: Got some error", s.ToString());
+  EXPECT_EQ("message: \"Got some error\"\n"
+            "[kudu.rpc_test.CalculatorError.app_error_ext] {\n"
+            "  extra_error_data: \"some application-specific error data\"\n"
+            "}\n",
+            SecureDebugString(*controller.error_response()));
+}
+
+TEST_F(RpcStubTest, TestRpcPanic) {
+  if (!FLAGS_is_panic_test_child) {
+    // This is a poor man's death test. We call this same
+    // test case, but set the above flag, and verify that
+    // it aborted. gtest death tests don't work here because
+    // there are already threads started up.
+    vector<string> argv;
+    string executable_path;
+    CHECK_OK(env_->GetExecutablePath(&executable_path));
+    argv.push_back(executable_path);
+    argv.push_back("--is_panic_test_child");
+    argv.push_back("--gtest_filter=RpcStubTest.TestRpcPanic");
+    Subprocess subp(argv);
+    subp.ShareParentStderr(false);
+    CHECK_OK(subp.Start());
+    FILE* in = fdopen(subp.from_child_stderr_fd(), "r");
+    PCHECK(in);
+
+    // Search for string "Test method panicking!" somewhere in stderr
+    char buf[1024];
+    bool found_string = false;
+    while (fgets(buf, sizeof(buf), in)) {
+      if (strstr(buf, "Test method panicking!")) {
+        found_string = true;
+        break;
+      }
+    }
+    CHECK(found_string);
+
+    // Check return status
+    int wait_status = 0;
+    CHECK_OK(subp.Wait(&wait_status));
+    CHECK(!WIFEXITED(wait_status)); // should not have been successful
+    if (WIFSIGNALED(wait_status)) {
+      CHECK_EQ(WTERMSIG(wait_status), SIGABRT);
+    } else {
+      // On some systems, we get exit status 134 from SIGABRT rather than
+      // WIFSIGNALED getting flagged.
+      CHECK_EQ(WEXITSTATUS(wait_status), 134);
+    }
+    return;
+  } else {
+    // Before forcing the panic, explicitly remove the test directory. This
+    // should be safe; this test doesn't generate any data.
+    CHECK_OK(env_->DeleteRecursively(test_dir_));
+
+    // Make an RPC which causes the server to abort.
+    CalculatorServiceProxy p(client_messenger_, server_addr_);
+    RpcController controller;
+    PanicRequestPB req;
+    PanicResponsePB resp;
+    p.Panic(req, &resp, &controller);
+  }
+}
+
+struct AsyncSleep {
+  AsyncSleep() : latch(1) {}
+
+  RpcController rpc;
+  SleepRequestPB req;
+  SleepResponsePB resp;
+  CountDownLatch latch;
+};
+
+TEST_F(RpcStubTest, TestDontHandleTimedOutCalls) {
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  vector<AsyncSleep*> sleeps;
+  ElementDeleter d(&sleeps);
+
+  // Send enough sleep calls to occupy the worker threads.
+  for (int i = 0; i < n_worker_threads_; i++) {
+    gscoped_ptr<AsyncSleep> sleep(new AsyncSleep);
+    sleep->rpc.set_timeout(MonoDelta::FromSeconds(1));
+    sleep->req.set_sleep_micros(100*1000); // 100ms
+    p.SleepAsync(sleep->req, &sleep->resp, &sleep->rpc,
+                 boost::bind(&CountDownLatch::CountDown, &sleep->latch));
+    sleeps.push_back(sleep.release());
+  }
+
+  // We asynchronously sent the RPCs above, but the RPCs might still
+  // be in the queue. Because the RPC we send next has a lower timeout,
+  // it would take priority over the long-timeout RPCs. So, we have to
+  // wait until the above RPCs are being processed before we continue
+  // the test.
+  const Histogram* queue_time_metric = service_pool_->IncomingQueueTimeMetricForTests();
+  while (queue_time_metric->TotalCount() < n_worker_threads_) {
+    SleepFor(MonoDelta::FromMilliseconds(1));
+  }
+
+  // Send another call with a short timeout. This shouldn't get processed, because
+  // it'll get stuck in the queue for longer than its timeout.
+  RpcController rpc;
+  SleepRequestPB req;
+  SleepResponsePB resp;
+  req.set_sleep_micros(1000);
+  rpc.set_timeout(MonoDelta::FromMilliseconds(1));
+  Status s = p.Sleep(req, &resp, &rpc);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+  for (AsyncSleep* s : sleeps) {
+    s->latch.Wait();
+  }
+
+  // Verify that the timedout call got short circuited before being processed.
+  const Counter* timed_out_in_queue = service_pool_->RpcsTimedOutInQueueMetricForTests();
+  ASSERT_EQ(1, timed_out_in_queue->value());
+}
+
+// Test which ensures that the RPC queue accepts requests with the earliest
+// deadline first (EDF), and upon overflow rejects requests with the latest deadlines.
+//
+// In particular, this simulates a workload experienced with Impala where the local
+// impalad would spawn more scanner threads than the total number of handlers plus queue
+// slots, guaranteeing that some of those clients would see SERVER_TOO_BUSY rejections on
+// scan requests and be forced to back off and retry.  Without EDF scheduling, we saw that
+// the "unlucky" threads that got rejected would likely continue to get rejected upon
+// retries, and some would be starved continually until they missed their overall deadline
+// and failed the query.
+//
+// With EDF scheduling, the retries take priority over the original requests (because
+// they retain their original deadlines). This prevents starvation of unlucky threads.
+TEST_F(RpcStubTest, TestEarliestDeadlineFirstQueue) {
+  const int num_client_threads = service_queue_length_ + n_worker_threads_ + 5;
+  vector<std::thread> threads;
+  vector<int> successes(num_client_threads);
+  std::atomic<bool> done(false);
+  for (int thread_id = 0; thread_id < num_client_threads; thread_id++) {
+    threads.emplace_back([&, thread_id] {
+        Random rng(thread_id);
+        CalculatorServiceProxy p(client_messenger_, server_addr_);
+        while (!done.load()) {
+          // Set a deadline in the future. We'll keep using this same deadline
+          // on each of our retries.
+          MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(8);
+
+          for (int attempt = 1; !done.load(); attempt++) {
+            RpcController controller;
+            SleepRequestPB req;
+            SleepResponsePB resp;
+            controller.set_deadline(deadline);
+            req.set_sleep_micros(100000);
+            Status s = p.Sleep(req, &resp, &controller);
+            if (s.ok()) {
+              successes[thread_id]++;
+              break;
+            }
+            // We expect to get SERVER_TOO_BUSY errors because we have more clients than the
+            // server has handlers and queue slots. No other errors are expected.
+            CHECK(s.IsRemoteError() &&
+                  controller.error_response()->code() == rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY)
+                << "Unexpected RPC failure: " << s.ToString();
+            // Randomized exponential backoff (similar to that done by the scanners in the Kudu
+            // client.).
+            int backoff = (0.5 + rng.NextDoubleFraction() * 0.5) * (std::min(1 << attempt, 1000));
+            VLOG(1) << "backoff " << backoff << "ms";
+            SleepFor(MonoDelta::FromMilliseconds(backoff));
+          }
+        }
+      });
+  }
+  // Let the threads run for 5 seconds before stopping them.
+  SleepFor(MonoDelta::FromSeconds(5));
+  done.store(true);
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  // Before switching to earliest-deadline-first scheduling, the results
+  // here would typically look something like:
+  //  1 1 0 1 10 17 6 1 12 12 17 10 8 7 12 9 16 15
+  // With the fix, we see something like:
+  //  9 9 9 8 9 9 9 9 9 9 9 9 9 9 9 9 9
+  LOG(INFO) << "thread RPC success counts: " << successes;
+
+  int sum = 0;
+  int min = std::numeric_limits<int>::max();
+  for (int x : successes) {
+    sum += x;
+    min = std::min(min, x);
+  }
+  int avg = sum / successes.size();
+  ASSERT_GT(min, avg / 2)
+      << "expected the least lucky thread to have at least half as many successes "
+      << "as the average thread: min=" << min << " avg=" << avg;
+}
+
+TEST_F(RpcStubTest, TestDumpCallsInFlight) {
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  AsyncSleep sleep;
+  sleep.req.set_sleep_micros(100 * 1000); // 100ms
+  p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc,
+               boost::bind(&CountDownLatch::CountDown, &sleep.latch));
+
+  // Check the running RPC status on the client messenger.
+  DumpRunningRpcsRequestPB dump_req;
+  DumpRunningRpcsResponsePB dump_resp;
+  dump_req.set_include_traces(true);
+
+  ASSERT_OK(client_messenger_->DumpRunningRpcs(dump_req, &dump_resp));
+  LOG(INFO) << "client messenger: " << SecureDebugString(dump_resp);
+  ASSERT_EQ(1, dump_resp.outbound_connections_size());
+  ASSERT_EQ(1, dump_resp.outbound_connections(0).calls_in_flight_size());
+  ASSERT_EQ("Sleep", dump_resp.outbound_connections(0).calls_in_flight(0).
+                        header().remote_method().method_name());
+  ASSERT_GT(dump_resp.outbound_connections(0).calls_in_flight(0).micros_elapsed(), 0);
+
+  // And the server messenger.
+  // We have to loop this until we find a result since the actual call is sent
+  // asynchronously off of the main thread (ie the server may not be handling it yet)
+  for (int i = 0; i < 100; i++) {
+    dump_resp.Clear();
+    ASSERT_OK(server_messenger_->DumpRunningRpcs(dump_req, &dump_resp));
+    if (dump_resp.inbound_connections_size() > 0 &&
+        dump_resp.inbound_connections(0).calls_in_flight_size() > 0) {
+      break;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(1));
+  }
+
+  LOG(INFO) << "server messenger: " << SecureDebugString(dump_resp);
+  ASSERT_EQ(1, dump_resp.inbound_connections_size());
+  ASSERT_EQ(1, dump_resp.inbound_connections(0).calls_in_flight_size());
+  ASSERT_EQ("Sleep", dump_resp.inbound_connections(0).calls_in_flight(0).
+                        header().remote_method().method_name());
+  ASSERT_GT(dump_resp.inbound_connections(0).calls_in_flight(0).micros_elapsed(), 0);
+  ASSERT_STR_CONTAINS(dump_resp.inbound_connections(0).calls_in_flight(0).trace_buffer(),
+                      "Inserting onto call queue");
+  sleep.latch.Wait();
+}
+
+TEST_F(RpcStubTest, TestDumpSampledCalls) {
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  // Issue two calls that fall into different latency buckets.
+  AsyncSleep sleeps[2];
+  sleeps[0].req.set_sleep_micros(150 * 1000); // 150ms
+  sleeps[1].req.set_sleep_micros(1500 * 1000); // 1500ms
+
+  for (auto& sleep : sleeps) {
+    p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc,
+                 boost::bind(&CountDownLatch::CountDown, &sleep.latch));
+  }
+  for (auto& sleep : sleeps) {
+    sleep.latch.Wait();
+  }
+
+  // Dump the sampled RPCs and expect to see the calls
+  // above.
+
+  DumpRpczStoreResponsePB sampled_rpcs;
+  server_messenger_->rpcz_store()->DumpPB(DumpRpczStoreRequestPB(), &sampled_rpcs);
+  EXPECT_EQ(sampled_rpcs.methods_size(), 1);
+  ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs),
+                      "    metrics {\n"
+                      "      key: \"test_sleep_us\"\n"
+                      "      value: 150000\n"
+                      "    }\n");
+  ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs),
+                      "    metrics {\n"
+                      "      key: \"test_sleep_us\"\n"
+                      "      value: 1500000\n"
+                      "    }\n");
+  ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs),
+                      "    metrics {\n"
+                      "      child_path: \"test_child\"\n"
+                      "      key: \"related_trace_metric\"\n"
+                      "      value: 1\n"
+                      "    }");
+  ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), "SleepRequestPB");
+  ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), "duration_ms");
+}
+
+namespace {
+struct RefCountedTest : public RefCountedThreadSafe<RefCountedTest> {
+};
+
+// Test callback which takes a refcounted pointer.
+// We don't use this parameter, but it's used to validate that the bound callback
+// is cleared in TestCallbackClearedAfterRunning.
+void MyTestCallback(CountDownLatch* latch, scoped_refptr<RefCountedTest> my_refptr) {
+  latch->CountDown();
+}
+} // anonymous namespace
+
+// Verify that, after a call has returned, no copy of the call's callback
+// is held. This is important when the callback holds a refcounted ptr,
+// since we expect to be able to release that pointer when the call is done.
+TEST_F(RpcStubTest, TestCallbackClearedAfterRunning) {
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  CountDownLatch latch(1);
+  scoped_refptr<RefCountedTest> my_refptr(new RefCountedTest);
+  RpcController controller;
+  AddRequestPB req;
+  req.set_x(10);
+  req.set_y(20);
+  AddResponsePB resp;
+  p.AddAsync(req, &resp, &controller,
+             boost::bind(MyTestCallback, &latch, my_refptr));
+  latch.Wait();
+
+  // The ref count should go back down to 1. However, we need to loop a little
+  // bit, since the deref is happening on another thread. If the other thread gets
+  // descheduled directly after calling our callback, we'd fail without these sleeps.
+  for (int i = 0; i < 100 && !my_refptr->HasOneRef(); i++) {
+    SleepFor(MonoDelta::FromMilliseconds(1));
+  }
+  ASSERT_TRUE(my_refptr->HasOneRef());
+}
+
+// Regression test for KUDU-1409: if the client reactor thread is blocked (e.g due to a
+// process-wide pause or a slow callback) then we should not cause RPC calls to time out.
+TEST_F(RpcStubTest, DontTimeOutWhenReactorIsBlocked) {
+  CHECK_EQ(client_messenger_->num_reactors(), 1)
+      << "This test requires only a single reactor. Otherwise the injected sleep might "
+      << "be scheduled on a different reactor than the RPC call.";
+
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  // Schedule a 1-second sleep on the reactor thread.
+  //
+  // This will cause us the reactor to be blocked while the call response is received, and
+  // still be blocked when the timeout would normally occur. Despite this, the call should
+  // not time out.
+  //
+  //  0s         0.5s          1.2s     1.5s
+  //  RPC call running
+  //  |---------------------|
+  //              Reactor blocked in sleep
+  //             |----------------------|
+  //                            \_ RPC would normally time out
+
+  client_messenger_->ScheduleOnReactor([](const Status& s) {
+      ThreadRestrictions::ScopedAllowWait allow_wait;
+      SleepFor(MonoDelta::FromSeconds(1));
+    }, MonoDelta::FromSeconds(0.5));
+
+  RpcController controller;
+  SleepRequestPB req;
+  SleepResponsePB resp;
+  req.set_sleep_micros(800 * 1000);
+  controller.set_timeout(MonoDelta::FromMilliseconds(1200));
+  ASSERT_OK(p.Sleep(req, &resp, &controller));
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpcz_store.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpcz_store.cc b/be/src/kudu/rpc/rpcz_store.cc
new file mode 100644
index 0000000..66c23d0
--- /dev/null
+++ b/be/src/kudu/rpc/rpcz_store.cc
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpcz_store.h"
+
+#include <algorithm>
+#include <array>
+#include <glog/stl_logging.h>
+#include <mutex> // for unique_lock
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "kudu/gutil/walltime.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/trace.h"
+
+
+DEFINE_bool(rpc_dump_all_traces, false,
+            "If true, dump all RPC traces at INFO level");
+TAG_FLAG(rpc_dump_all_traces, advanced);
+TAG_FLAG(rpc_dump_all_traces, runtime);
+
+using std::pair;
+using std::vector;
+using std::unique_ptr;
+
+namespace kudu {
+namespace rpc {
+
+// Sample an RPC call once every N milliseconds within each
+// bucket. If the current sample in a latency bucket is older
+// than this threshold, a new sample will be taken.
+static const int kSampleIntervalMs = 1000;
+
+static const int kBucketThresholdsMs[] = {10, 100, 1000};
+static constexpr int kNumBuckets = arraysize(kBucketThresholdsMs) + 1;
+
+// An instance of this class is created For each RPC method implemented
+// on the server. It keeps several recent samples for each RPC, currently
+// based on fixed time buckets.
+class MethodSampler {
+ public:
+  MethodSampler() {}
+  ~MethodSampler() {}
+
+  // Potentially sample a single call.
+  void SampleCall(InboundCall* call);
+
+  // Dump the current samples.
+  void GetSamplePBs(RpczMethodPB* pb);
+
+ private:
+  // Convert the trace metrics from 't' into protobuf entries in 'sample_pb'.
+  // This function recurses through the parent-child relationship graph,
+  // keeping the current tree path in 'child_path' (empty at the root).
+  static void GetTraceMetrics(const Trace& t,
+                              const std::string& child_path,
+                              RpczSamplePB* sample_pb);
+
+  // An individual recorded sample.
+  struct Sample {
+    RequestHeader header;
+    scoped_refptr<Trace> trace;
+    int duration_ms;
+  };
+
+  // A sample, including the particular time at which it was
+  // sampled, and a lock protecting it.
+  struct SampleBucket {
+    SampleBucket() : last_sample_time(0) {}
+
+    AtomicInt<int64_t> last_sample_time;
+    simple_spinlock sample_lock;
+    Sample sample;
+  };
+  std::array<SampleBucket, kNumBuckets> buckets_;
+
+  DISALLOW_COPY_AND_ASSIGN(MethodSampler);
+};
+
+MethodSampler* RpczStore::SamplerForCall(InboundCall* call) {
+  if (PREDICT_FALSE(!call->method_info())) {
+    return nullptr;
+  }
+
+  // Most likely, we already have a sampler created for the call.
+  {
+    shared_lock<rw_spinlock> l(samplers_lock_.get_lock());
+    auto it = method_samplers_.find(call->method_info());
+    if (PREDICT_TRUE(it != method_samplers_.end())) {
+      return it->second.get();
+    }
+  }
+
+  // If missing, create a new sampler for this method and try to insert it.
+  unique_ptr<MethodSampler> ms(new MethodSampler());
+  std::lock_guard<percpu_rwlock> lock(samplers_lock_);
+  auto it = method_samplers_.find(call->method_info());
+  if (it != method_samplers_.end()) {
+    return it->second.get();
+  }
+  auto* ret = ms.get();
+  method_samplers_[call->method_info()] = std::move(ms);
+  return ret;
+}
+
+void MethodSampler::SampleCall(InboundCall* call) {
+  // First determine which sample bucket to put this in.
+  int duration_ms = call->timing().TotalDuration().ToMilliseconds();
+
+  SampleBucket* bucket = &buckets_[kNumBuckets - 1];
+  for (int i = 0 ; i < kNumBuckets - 1; i++) {
+    if (duration_ms < kBucketThresholdsMs[i]) {
+      bucket = &buckets_[i];
+      break;
+    }
+  }
+
+  MicrosecondsInt64 now = GetMonoTimeMicros();
+  int64_t us_since_trace = now - bucket->last_sample_time.Load();
+  if (us_since_trace > kSampleIntervalMs * 1000) {
+    Sample new_sample = {call->header(), call->trace(), duration_ms};
+    {
+      std::unique_lock<simple_spinlock> lock(bucket->sample_lock, std::try_to_lock);
+      // If another thread is already taking a sample, it's not worth waiting.
+      if (!lock.owns_lock()) {
+        return;
+      }
+      std::swap(bucket->sample, new_sample);
+      bucket->last_sample_time.Store(now);
+    }
+    VLOG(1) << "Sampled call " << call->ToString();
+  }
+}
+
+void MethodSampler::GetTraceMetrics(const Trace& t,
+                                    const string& child_path,
+                                    RpczSamplePB* sample_pb) {
+  auto m = t.metrics().Get();
+  for (const auto& e : m) {
+    auto* pb = sample_pb->add_metrics();
+    pb->set_key(e.first);
+    pb->set_value(e.second);
+    if (!child_path.empty()) {
+      pb->set_child_path(child_path);
+    }
+  }
+
+  for (const auto& child_pair : t.ChildTraces()) {
+    string path = child_path;
+    if (!path.empty()) {
+      path += ".";
+    }
+    path += child_pair.first.ToString();
+    GetTraceMetrics(*child_pair.second.get(), path, sample_pb);
+  }
+}
+
+void MethodSampler::GetSamplePBs(RpczMethodPB* method_pb) {
+  for (auto& bucket : buckets_) {
+    if (bucket.last_sample_time.Load() == 0) continue;
+
+    std::unique_lock<simple_spinlock> lock(bucket.sample_lock);
+    auto* sample_pb = method_pb->add_samples();
+    sample_pb->mutable_header()->CopyFrom(bucket.sample.header);
+    sample_pb->set_trace(bucket.sample.trace->DumpToString(Trace::INCLUDE_TIME_DELTAS));
+
+    GetTraceMetrics(*bucket.sample.trace.get(), "", sample_pb);
+    sample_pb->set_duration_ms(bucket.sample.duration_ms);
+  }
+}
+
+RpczStore::RpczStore() {}
+RpczStore::~RpczStore() {}
+
+void RpczStore::AddCall(InboundCall* call) {
+  LogTrace(call);
+  auto* sampler = SamplerForCall(call);
+  if (PREDICT_FALSE(!sampler)) return;
+
+  sampler->SampleCall(call);
+}
+
+void RpczStore::DumpPB(const DumpRpczStoreRequestPB& req,
+                       DumpRpczStoreResponsePB* resp) {
+  vector<pair<RpcMethodInfo*, MethodSampler*>> samplers;
+  {
+    shared_lock<rw_spinlock> l(samplers_lock_.get_lock());
+    for (const auto& p : method_samplers_) {
+      samplers.emplace_back(p.first, p.second.get());
+    }
+  }
+
+  for (const auto& p : samplers) {
+    auto* sampler = p.second;
+
+    RpczMethodPB* method_pb = resp->add_methods();
+    // TODO: use the actual RPC name instead of the request type name.
+    // Currently this isn't conveniently plumbed here, but the type name
+    // is close enough.
+    method_pb->set_method_name(p.first->req_prototype->GetTypeName());
+    sampler->GetSamplePBs(method_pb);
+  }
+}
+
+void RpczStore::LogTrace(InboundCall* call) {
+  int duration_ms = call->timing().TotalDuration().ToMilliseconds();
+
+  if (call->header_.has_timeout_millis() && call->header_.timeout_millis() > 0) {
+    double log_threshold = call->header_.timeout_millis() * 0.75f;
+    if (duration_ms > log_threshold) {
+      // TODO: consider pushing this onto another thread since it may be slow.
+      // The traces may also be too large to fit in a log message.
+      LOG(WARNING) << call->ToString() << " took " << duration_ms << "ms (client timeout "
+                   << call->header_.timeout_millis() << ").";
+      std::string s = call->trace()->DumpToString();
+      if (!s.empty()) {
+        LOG(WARNING) << "Trace:\n" << s;
+      }
+      return;
+    }
+  }
+
+  if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
+    LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. Trace:";
+    call->trace()->Dump(&LOG(INFO), true);
+  } else if (duration_ms > 1000) {
+    LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. "
+              << "Request Metrics: " << call->trace()->MetricsAsJSON();
+  }
+}
+
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpcz_store.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpcz_store.h b/be/src/kudu/rpc/rpcz_store.h
new file mode 100644
index 0000000..48e4474
--- /dev/null
+++ b/be/src/kudu/rpc/rpcz_store.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/macros.h"
+
+#include <memory>
+#include <unordered_map>
+
+#include "kudu/util/locks.h"
+
+namespace kudu {
+namespace rpc {
+
+class DumpRpczStoreRequestPB;
+class DumpRpczStoreResponsePB;
+class InboundCall;
+class MethodSampler;
+struct RpcMethodInfo;
+
+// Responsible for storing sampled traces associated with completed calls.
+// Before each call is responded to, it is added to this store.
+class RpczStore {
+ public:
+  RpczStore();
+  ~RpczStore();
+
+  // Process a single call, potentially sampling it for later analysis.
+  //
+  // If the call is sampled, it might be mutated. For example, the request
+  // and response might be taken from the call and stored as part of the
+  // sample. This should be called just before a call response is sent
+  // to the client.
+  void AddCall(InboundCall* c);
+
+  // Dump all of the collected RPC samples in response to a user query.
+  void DumpPB(const DumpRpczStoreRequestPB& req,
+              DumpRpczStoreResponsePB* resp);
+
+ private:
+  // Look up or create the particular MethodSampler instance which should
+  // store samples for this call.
+  MethodSampler* SamplerForCall(InboundCall* call);
+
+  // Log a WARNING message if the RPC response was slow enough that the
+  // client likely timed out. This is based on the client-provided timeout
+  // value.
+  // Also can be configured to log _all_ RPC traces for help debugging.
+  void LogTrace(InboundCall* call);
+
+  percpu_rwlock samplers_lock_;
+
+  // Protected by samplers_lock_.
+  std::unordered_map<RpcMethodInfo*, std::unique_ptr<MethodSampler>> method_samplers_;
+
+  DISALLOW_COPY_AND_ASSIGN(RpczStore);
+};
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rtest.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rtest.proto b/be/src/kudu/rpc/rtest.proto
new file mode 100644
index 0000000..1ef5ca6
--- /dev/null
+++ b/be/src/kudu/rpc/rtest.proto
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Test protocol for kudu RPC.
+syntax = "proto2";
+package kudu.rpc_test;
+
+import "kudu/rpc/rpc_header.proto";
+import "kudu/rpc/rtest_diff_package.proto";
+
+message AddRequestPB {
+  required uint32 x = 1;
+  required uint32 y = 2;
+}
+
+// Used by tests to simulate an old client which is missing
+// a newly added required field.
+message AddRequestPartialPB {
+  required uint32 x = 1;
+}
+
+message AddResponsePB {
+  required uint32 result = 1;
+}
+
+message SleepRequestPB {
+  required uint32 sleep_micros = 1;
+
+  // Used in rpc_stub-test: if this is true, it will respond from a different
+  // thread than the one that receives the request.
+  optional bool deferred = 2 [ default = false ];
+
+  // If set, returns a CalculatorError response.
+  optional bool return_app_error = 3 [ default = false ];
+
+  // Used in rpc-test: if this is set to true and no client timeout is set,
+  // the service will respond to the client with an error.
+  optional bool client_timeout_defined = 4 [ default = false ];
+}
+
+message SleepResponsePB {
+}
+
+message SendTwoStringsRequestPB {
+  required uint32 random_seed = 1;
+  required uint64 size1 = 2;
+  required uint64 size2 = 3;
+}
+
+message SendTwoStringsResponsePB {
+  required uint32 sidecar1 = 1;
+  required uint32 sidecar2 = 2;
+}
+
+// Push two strings to the server as part of the request, in sidecars.
+message PushTwoStringsRequestPB {
+  required uint32 sidecar1_idx = 1;
+  required uint32 sidecar2_idx = 2;
+}
+
+message PushTwoStringsResponsePB {
+  required uint32 size1 = 1;
+  required string data1 = 2;
+  required uint32 size2 = 3;
+  required string data2 = 4;
+}
+
+message EchoRequestPB {
+  required string data = 1;
+}
+message EchoResponsePB {
+  required string data = 1;
+}
+
+message WhoAmIRequestPB {
+}
+message WhoAmIResponsePB {
+  required kudu.rpc.UserInformationPB credentials = 1;
+  required string address = 2;
+}
+
+message CalculatorError {
+  extend kudu.rpc.ErrorStatusPB {
+    optional CalculatorError app_error_ext = 101;
+  }
+
+  required string extra_error_data = 1;
+}
+
+message PanicRequestPB {}
+message PanicResponsePB {}
+
+message TestInvalidResponseRequestPB {
+  enum ErrorType {
+    MISSING_REQUIRED_FIELD = 1;
+    RESPONSE_TOO_LARGE = 2;
+  }
+  required ErrorType error_type = 1;
+}
+
+message TestInvalidResponseResponsePB {
+  required bytes response = 1;
+}
+
+enum FeatureFlags {
+  UNKNOWN=0;
+  FOO=1;
+}
+
+message ExactlyOnceRequestPB {
+  optional uint32 sleep_for_ms = 1 [default = 0];
+  required uint32 value_to_add = 2;
+  optional bool randomly_fail = 3 [default = false];
+}
+message ExactlyOnceResponsePB {
+  required uint32 current_val = 1;
+  required fixed64 current_time_micros = 2;
+}
+
+service CalculatorService {
+  option (kudu.rpc.default_authz_method) = "AuthorizeDisallowAlice";
+
+  rpc Add(AddRequestPB) returns(AddResponsePB);
+  rpc Sleep(SleepRequestPB) returns(SleepResponsePB) {
+    option (kudu.rpc.authz_method) = "AuthorizeDisallowBob";
+  };
+  rpc Echo(EchoRequestPB) returns(EchoResponsePB);
+  rpc WhoAmI(WhoAmIRequestPB) returns (WhoAmIResponsePB);
+  rpc TestArgumentsInDiffPackage(kudu.rpc_test_diff_package.ReqDiffPackagePB)
+    returns(kudu.rpc_test_diff_package.RespDiffPackagePB);
+  rpc Panic(PanicRequestPB) returns (PanicResponsePB);
+  rpc AddExactlyOnce(ExactlyOnceRequestPB) returns (ExactlyOnceResponsePB) {
+    option (kudu.rpc.track_rpc_result) = true;
+  }
+  rpc TestInvalidResponse(TestInvalidResponseRequestPB) returns (TestInvalidResponseResponsePB);
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rtest_diff_package.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rtest_diff_package.proto b/be/src/kudu/rpc/rtest_diff_package.proto
new file mode 100644
index 0000000..f6f9b60
--- /dev/null
+++ b/be/src/kudu/rpc/rtest_diff_package.proto
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Request/Response in different package to test that RPC methods
+// handle arguments with packages different from the service itself.
+syntax = "proto2";
+package kudu.rpc_test_diff_package;
+
+message ReqDiffPackagePB {
+}
+message RespDiffPackagePB {
+}


Mime
View raw message