kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [3/3] kudu git commit: [rpc] introduce per-RPC credentials policy
Date Tue, 16 May 2017 20:22:28 GMT
[rpc] introduce per-RPC credentials policy

This patch introduces policy for RPC authentication credentials.  The
authentication credentials policy allows for control over the type of
client-side credentials used for making a remote procedure call.

The idea behind this change is simple: sometimes the server's behavior
depends on the type of client's credentials used to authenticate the
client to the server in the context of the remote procedure call.  If
the client expects some particular behavior from the server, it has to
explicitly specify the type of credentials it wants to use for the call.

One example of an RPC depending on the type of the specified credentials
is MasterService::ConnectToMaster().  It's impossible to receive an
authentication token from the master if calling that method over a
connection established with an authn token.  To get a new authn token
in that case, it's necessary to open a new connection to the master
using types of credentials other than authn token (e.g., Kerberos
credentials or TLS certificate will work).

In other words, derived/secondary authentication credentials
(such as authn token) can only be acquired if using the primary ones.
That's a crucial restriction to allow for enforcing expiration
of derived/secondary credentials.  With this patch a client has an
ability to re-acquire secondary authentication credentials (authn token)
regardless of the type of credentials used to established current
connection to Kudu master.

As a part of this patch, a new unit test is added to cover the new
functionality.

Change-Id: I52f806e7b6f6362f66148530124e748e199ae6c2
Reviewed-on: http://gerrit.cloudera.org:8080/6875
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>
Reviewed-by: Dan Burkert <danburkert@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/58f1a216
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/58f1a216
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/58f1a216

Branch: refs/heads/master
Commit: 58f1a21696aa5b65ad1e5086cb5d6800f50fe367
Parents: b492d8e
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Fri May 12 00:25:27 2017 -0700
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Tue May 16 20:21:24 2017 +0000

----------------------------------------------------------------------
 src/kudu/rpc/connection.cc     |  16 +++-
 src/kudu/rpc/connection.h      |  53 +++++++++++++-
 src/kudu/rpc/messenger.h       |   3 +-
 src/kudu/rpc/negotiation.cc    |  12 ++-
 src/kudu/rpc/outbound_call.h   |   2 +-
 src/kudu/rpc/proxy.cc          |   6 +-
 src/kudu/rpc/reactor.cc        | 141 ++++++++++++++++++++++++------------
 src/kudu/rpc/reactor.h         |  13 +++-
 src/kudu/rpc/rpc-test-base.h   |  10 ++-
 src/kudu/rpc/rpc-test.cc       |  72 ++++++++++++++++++
 src/kudu/rpc/rpc_controller.cc |  11 ++-
 src/kudu/rpc/rpc_controller.h  |  37 +++++++++-
 src/kudu/rpc/rpc_stub-test.cc  |   2 +-
 13 files changed, 304 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 66aecd2..912377c 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -21,6 +21,7 @@
 
 #include <algorithm>
 #include <iostream>
+#include <memory>
 #include <set>
 #include <string>
 #include <unordered_set>
@@ -66,7 +67,8 @@ namespace rpc {
 Connection::Connection(ReactorThread *reactor_thread,
                        Sockaddr remote,
                        unique_ptr<Socket> socket,
-                       Direction direction)
+                       Direction direction,
+                       CredentialsPolicy policy)
     : reactor_thread_(reactor_thread),
       remote_(remote),
       socket_(std::move(socket)),
@@ -74,7 +76,9 @@ Connection::Connection(ReactorThread *reactor_thread,
       last_activity_time_(MonoTime::Now()),
       is_epoll_registered_(false),
       next_call_id_(1),
-      negotiation_complete_(false) {
+      credentials_policy_(policy),
+      negotiation_complete_(false),
+      scheduled_for_shutdown_(false) {
 }
 
 Status Connection::SetNonBlocking(bool enabled) {
@@ -435,6 +439,12 @@ void Connection::QueueResponseForCall(gscoped_ptr<InboundCall>
call) {
   reactor_thread_->reactor()->ScheduleReactorTask(task);
 }
 
+bool Connection::SatisfiesCredentialsPolicy(CredentialsPolicy policy) const {
+  DCHECK_EQ(direction_, CLIENT);
+  return (policy == CredentialsPolicy::ANY_CREDENTIALS) ||
+      (policy == credentials_policy_);
+}
+
 RpczStore* Connection::rpcz_store() {
   return reactor_thread_->reactor()->messenger()->rpcz_store();
 }
@@ -647,7 +657,7 @@ class NegotiationCompletedTask : public ReactorTask {
 
  private:
   scoped_refptr<Connection> conn_;
-  Status negotiation_status_;
+  const Status negotiation_status_;
 };
 
 void Connection::CompleteNegotiation(const Status& negotiation_status) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index e165c29..e75b7e6 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -18,8 +18,7 @@
 #ifndef KUDU_RPC_CONNECTION_H
 #define KUDU_RPC_CONNECTION_H
 
-#include <stdint.h>
-
+#include <cstdint>
 #include <limits>
 #include <memory>
 #include <set>
@@ -35,6 +34,7 @@
 #include "kudu/rpc/inbound_call.h"
 #include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/remote_user.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
@@ -49,6 +49,7 @@ class DumpRunningRpcsRequestPB;
 class RpcConnectionPB;
 class ReactorThread;
 class RpczStore;
+enum class CredentialsPolicy;
 
 //
 // A connection between an endpoint and us.
@@ -84,7 +85,8 @@ class Connection : public RefCountedThreadSafe<Connection> {
   Connection(ReactorThread *reactor_thread,
              Sockaddr remote,
              std::unique_ptr<Socket> socket,
-             Direction direction);
+             Direction direction,
+             CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS);
 
   // Set underlying socket to non-blocking (or blocking) mode.
   Status SetNonBlocking(bool enabled);
@@ -133,6 +135,20 @@ class Connection : public RefCountedThreadSafe<Connection> {
     return local_user_credentials_;
   }
 
+  // Credentials policy to start connection negotiation.
+  CredentialsPolicy credentials_policy() const { return credentials_policy_; }
+
+  // Whether the connection satisfies the specified credentials policy.
+  //
+  // NOTE: The policy is set prior to connection negotiation, and the actual
+  //       authentication credentials used for connection negotiation might
+  //       effectively make the connection to satisfy a stronger policy.
+  //       An example: the credentials policy for the connection was set to
+  //       ANY_CREDENTIALS, but since the authn token was not available
+  //       at the time of negotiation, the primary credentials were used, making
+  //       the connection de facto satisfying the PRIMARY_CREDENTIALS policy.
+  bool SatisfiesCredentialsPolicy(CredentialsPolicy policy) const;
+
   RpczStore* rpcz_store();
 
   // libev callback when data is available to read.
@@ -181,6 +197,19 @@ class Connection : public RefCountedThreadSafe<Connection> {
     return remote_user_;
   }
 
+  // Whether the connection is scheduled for shutdown.
+  bool scheduled_for_shutdown() const {
+    DCHECK_EQ(direction_, CLIENT);
+    return scheduled_for_shutdown_;
+  }
+
+  // Mark the connection as scheduled to be shut down. Reactor does not dispatch
+  // new calls on such a connection.
+  void set_scheduled_for_shutdown() {
+    DCHECK_EQ(direction_, CLIENT);
+    scheduled_for_shutdown_ = true;
+  }
+
  private:
   friend struct CallAwaitingResponse;
   friend class QueueTransferTask;
@@ -300,8 +329,26 @@ class Connection : public RefCountedThreadSafe<Connection> {
   ObjectPool<CallAwaitingResponse> car_pool_;
   typedef ObjectPool<CallAwaitingResponse>::scoped_ptr scoped_car;
 
+  // The credentials policy to use for connection negotiation. It defines which
+  // type of user credentials used to negotiate a connection. The actual type of
+  // credentials used for authentication during the negotiation process depends
+  // on the credentials availability, but the result credentials guaranteed to
+  // always satisfy the specified credentials policy. In other words, the actual
+  // type of credentials used for connection negotiation might effectively make
+  // the connection to satisfy a stronger/narrower policy.
+  //
+  // An example:
+  //   The credentials policy for the connection was set to ANY_CREDENTIALS,
+  //   but since no secondary credentials (such authn token) were available
+  //   at the time of negotiation, the primary credentials were used,making the
+  //   connection satisfying the PRIMARY_CREDENTIALS policy de facto.
+  const CredentialsPolicy credentials_policy_;
+
   // Whether we completed connection negotiation.
   bool negotiation_complete_;
+
+  // Whether the connection is scheduled for shutdown.
+  bool scheduled_for_shutdown_;
 };
 
 } // namespace rpc

http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 7f2974d..1ba76a7 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -233,7 +233,7 @@ class Messenger {
 
   int num_reactors() const { return reactors_.size(); }
 
-  std::string name() const {
+  const std::string& name() const {
     return name_;
   }
 
@@ -248,6 +248,7 @@ class Messenger {
 
  private:
   FRIEND_TEST(TestRpc, TestConnectionKeepalive);
+  FRIEND_TEST(TestRpc, TestCredentialsPolicy);
   FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
 
   explicit Messenger(const MessengerBuilder &bld);

http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc
index db742ca..2bc5ea3 100644
--- a/src/kudu/rpc/negotiation.cc
+++ b/src/kudu/rpc/negotiation.cc
@@ -158,9 +158,12 @@ static Status DoClientNegotiation(Connection* conn,
                                   RpcEncryption encryption,
                                   MonoTime deadline) {
   const auto* messenger = conn->reactor_thread()->reactor()->messenger();
+  // Prefer secondary credentials (such as authn token) if permitted by policy.
+  const auto authn_token = (conn->credentials_policy() == CredentialsPolicy::PRIMARY_CREDENTIALS)
+      ? boost::none : messenger->authn_token();
   ClientNegotiation client_negotiation(conn->release_socket(),
                                        &messenger->tls_context(),
-                                       messenger->authn_token(),
+                                       authn_token,
                                        encryption);
 
   // Note that the fqdn is an IP address here: we've already lost whatever DNS
@@ -186,7 +189,7 @@ static Status DoClientNegotiation(Connection* conn,
       }
 
       if (authentication == RpcAuthentication::REQUIRED &&
-          !messenger->authn_token() &&
+          !authn_token &&
           !messenger->tls_context().has_signed_cert()) {
         return Status::InvalidArgument(
             "Kerberos, token, or PKI certificate credentials must be provided in order to
"
@@ -210,6 +213,11 @@ static Status DoClientNegotiation(Connection* conn,
   conn->adopt_socket(client_negotiation.release_socket());
   conn->set_remote_features(client_negotiation.take_server_features());
 
+  // Sanity check: if no authn token was supplied as user credentials,
+  // the negotiated authentication type cannot be AuthenticationType::TOKEN.
+  DCHECK(!(authn_token == boost::none &&
+           client_negotiation.negotiated_authn() == AuthenticationType::TOKEN));
+
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index 2b1b5ad..ee7bc64 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -258,7 +258,7 @@ class OutboundCall {
   // RPC-system features required to send this call.
   std::set<RpcFeatureFlag> required_rpc_features_;
 
-  ConnectionId conn_id_;
+  const ConnectionId conn_id_;
   ResponseCallback callback_;
   RpcController* controller_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 077af58..45ad5dd 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -76,11 +76,11 @@ void Proxy::AsyncRequest(const string& method,
                          google::protobuf::Message* response,
                          RpcController* controller,
                          const ResponseCallback& callback) const {
-  CHECK(controller->call_.get() == nullptr) << "Controller should be reset";
+  CHECK(!controller->call_) << "Controller should be reset";
   base::subtle::NoBarrier_Store(&is_started_, true);
   RemoteMethod remote_method(service_name_, method);
-  OutboundCall* call = new OutboundCall(conn_id_, remote_method, response, controller, callback);
-  controller->call_.reset(call);
+  controller->call_.reset(
+      new OutboundCall(conn_id_, remote_method, response, controller, callback));
   controller->SetRequestParam(req);
 
   // If this fails to queue, the callback will get called immediately

http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 6d6a5d4..525980b 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -24,15 +24,18 @@
 #include <sys/types.h>
 #include <unistd.h>
 
+#include <memory>
 #include <mutex>
 #include <string>
 
 #include <boost/intrusive/list.hpp>
+#include <boost/optional.hpp>
 #include <ev++.h>
 #include <glog/logging.h>
 
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/client_negotiation.h"
 #include "kudu/rpc/connection.h"
 #include "kudu/rpc/messenger.h"
@@ -64,6 +67,8 @@ static const int kDefaultLibEvFlags = ev::AUTO;
 
 using std::string;
 using std::shared_ptr;
+using std::unique_ptr;
+using strings::Substitute;
 
 // TODO(KUDU-1580). This timeout has been bumped from 3 seconds up to
 // 15 seconds to workaround a bug. We should drop it back down when
@@ -189,7 +194,7 @@ Status ReactorThread::DumpRunningRpcs(const DumpRunningRpcsRequestPB&
req,
   for (const scoped_refptr<Connection>& conn : server_conns_) {
     RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections()));
   }
-  for (const conn_map_t::value_type& entry : client_conns_) {
+  for (const conn_multimap_t::value_type& entry : client_conns_) {
     Connection* conn = entry.second.get();
     RETURN_NOT_OK(conn->DumpPB(req, resp->add_outbound_connections()));
   }
@@ -244,7 +249,9 @@ void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall>&
call) {
   DCHECK(IsCurrentThread());
   scoped_refptr<Connection> conn;
 
-  Status s = FindOrStartConnection(call->conn_id(), &conn);
+  Status s = FindOrStartConnection(call->conn_id(),
+                                   call->controller()->credentials_policy(),
+                                   &conn);
   if (PREDICT_FALSE(!s.ok())) {
     call->SetFailed(s);
     return;
@@ -278,36 +285,49 @@ void ReactorThread::RegisterTimeout(ev::timer *watcher) {
 
 void ReactorThread::ScanIdleConnections() {
   DCHECK(IsCurrentThread());
-  // enforce TCP connection timeouts
-  auto c = server_conns_.begin();
-  auto c_end = server_conns_.end();
+  // Enforce TCP connection timeouts: server-side connections.
+  const auto server_conns_end = server_conns_.end();
   uint64_t timed_out = 0;
-  for (; c != c_end; ) {
-    const scoped_refptr<Connection>& conn = *c;
+  for (auto it = server_conns_.begin(); it != server_conns_end; ) {
+    Connection* conn = it->get();
     if (!conn->Idle()) {
       VLOG(10) << "Connection " << conn->ToString() << " not idle";
-      ++c; // TODO(todd): clean up this loop
+      ++it;
+      continue;
+    }
+
+    const MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
+    if (connection_delta <= connection_keepalive_time_) {
+      ++it;
       continue;
     }
 
-    MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
-    if (connection_delta > connection_keepalive_time_) {
+    conn->Shutdown(Status::NetworkError(
+        Substitute("connection timed out after $0", connection_keepalive_time_.ToString())));
+    VLOG(1) << "Timing out connection " << conn->ToString() << " - it
has been idle for "
+            << connection_delta.ToString();
+    ++timed_out;
+    it = server_conns_.erase(it);
+  }
+
+  // Take care of idle client-side connections marked for shutdown.
+  uint64_t shutdown = 0;
+  for (auto it = client_conns_.begin(); it != client_conns_.end();) {
+    Connection* conn = it->second.get();
+    if (conn->scheduled_for_shutdown() && conn->Idle()) {
       conn->Shutdown(Status::NetworkError(
-                       StringPrintf("connection timed out after %s seconds",
-                                    connection_keepalive_time_.ToString().c_str())));
-      VLOG(1) << "Timing out connection " << conn->ToString() << " -
it has been idle for "
-              << connection_delta.ToSeconds() << "s";
-      server_conns_.erase(c++);
-      ++timed_out;
+          "connection has been marked for shutdown"));
+      it = client_conns_.erase(it);
+      ++shutdown;
     } else {
-      ++c;
+      ++it;
     }
   }
-
-  // TODO: above only times out on the server side.
-  // Clients may want to set their keepalive timeout as well.
+  // TODO(aserbin): clients may want to set their keepalive timeout for idle
+  //                but not scheduled for shutdown connections.
 
   VLOG_IF(1, timed_out > 0) << name() << ": timed out " << timed_out
<< " TCP connections.";
+  VLOG_IF(1, shutdown > 0) << name() << ": shutdown " << shutdown <<
" TCP connections.";
 }
 
 const std::string& ReactorThread::name() const {
@@ -339,25 +359,49 @@ void ReactorThread::RunThread() {
 }
 
 Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
+                                            CredentialsPolicy cred_policy,
                                             scoped_refptr<Connection>* conn) {
   DCHECK(IsCurrentThread());
-  conn_map_t::const_iterator it = client_conns_.find(conn_id);
-  if (it != client_conns_.end()) {
-    const auto& c = it->second;
-    // Regular mode: reuse the connection to the same server.
-    if (PREDICT_TRUE(!FLAGS_rpc_reopen_outbound_connections)) {
-      *conn = c;
-      return Status::OK();
-    }
-
-    // Kind of 'one-connection-per-RPC' mode: reopen the idle connection.
-    if (!c->Idle()) {
-      *conn = c;
-      return Status::OK();
+  const auto range = client_conns_.equal_range(conn_id);
+  scoped_refptr<Connection> found_conn;
+  for (auto it = range.first; it != range.second;) {
+    const auto& c = it->second.get();
+    // * Do not use connections scheduled for shutdown to place new calls.
+    //
+    // * Do not use a connection with a non-compliant credentials policy.
+    //   Instead, open a new one, while marking the former as scheduled for
+    //   shutdown. This process converges: any connection that satisfies the
+    //   PRIMARY_CREDENTIALS policy automatically satisfies the ANY_CREDENTIALS
+    //   policy as well. The idea is to keep only one usable connection
+    //   identified by the specified 'conn_id'.
+    //
+    // * If the test-only 'one-connection-per-RPC' mode is enabled, connections
+    //   are re-established at every RPC call.
+    if (c->scheduled_for_shutdown() ||
+        !c->SatisfiesCredentialsPolicy(cred_policy) ||
+        PREDICT_FALSE(FLAGS_rpc_reopen_outbound_connections)) {
+      if (c->Idle()) {
+        // Shutdown idle connections to the target destination. Non-idle ones
+        // will be taken care of later by the idle connection scanner.
+        DCHECK_EQ(Connection::CLIENT, c->direction());
+        c->Shutdown(Status::NetworkError("connection is closed due to non-reuse policy"));
+        it = client_conns_.erase(it);
+        continue;
+      }
+      c->set_scheduled_for_shutdown();
+    } else {
+      DCHECK(!found_conn);
+      found_conn = c;
+      // Appropriate connection is found; continue further to take care of the
+      // rest of connections to mark them for shutdown if they are not
+      // satisfying the policy.
     }
-    DCHECK_EQ(Connection::CLIENT, c->direction());
-    c->Shutdown(Status::NetworkError("connection is closed due to non-reuse policy"));
-    client_conns_.erase(it);
+    ++it;
+  }
+  if (found_conn) {
+    // Found matching not-to-be-shutdown connection: return it as the result.
+    conn->swap(found_conn);
+    return Status::OK();
   }
 
   // No connection to this remote. Need to create one.
@@ -369,10 +413,11 @@ Status ReactorThread::FindOrStartConnection(const ConnectionId&
conn_id,
   RETURN_NOT_OK(CreateClientSocket(&sock));
   RETURN_NOT_OK(StartConnect(&sock, conn_id.remote()));
 
-  std::unique_ptr<Socket> new_socket(new Socket(sock.Release()));
+  unique_ptr<Socket> new_socket(new Socket(sock.Release()));
 
   // Register the new connection in our map.
-  *conn = new Connection(this, conn_id.remote(), std::move(new_socket), Connection::CLIENT);
+  *conn = new Connection(
+      this, conn_id.remote(), std::move(new_socket), Connection::CLIENT, cred_policy);
   (*conn)->set_local_user_credentials(conn_id.user_credentials());
 
   // Kick off blocking client connection negotiation.
@@ -386,7 +431,7 @@ Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
   RETURN_NOT_OK_PREPEND(s, "Unable to start connection negotiation thread");
 
   // Insert into the client connection map to avoid duplicate connection requests.
-  client_conns_.insert(conn_map_t::value_type(conn_id, *conn));
+  client_conns_.emplace(conn_id, *conn);
   ++total_client_conns_cnt_;
 
   return Status::OK();
@@ -467,9 +512,16 @@ void ReactorThread::DestroyConnection(Connection *conn,
   // Unlink connection from lists.
   if (conn->direction() == Connection::CLIENT) {
     ConnectionId conn_id(conn->remote(), conn->local_user_credentials());
-    auto it = client_conns_.find(conn_id);
-    CHECK(it != client_conns_.end()) << "Couldn't find connection " << conn->ToString();
-    client_conns_.erase(it);
+    const auto range = client_conns_.equal_range(conn_id);
+    CHECK(range.first != range.second) << "Couldn't find connection " << conn->ToString();
+    // The client_conns_ container is a multi-map.
+    for (auto it = range.first; it != range.second;) {
+      if (it->second.get() == conn) {
+        it = client_conns_.erase(it);
+        break;
+      }
+      ++it;
+    }
   } else if (conn->direction() == Connection::SERVER) {
     auto it = server_conns_.begin();
     while (it != server_conns_.end()) {
@@ -636,7 +688,7 @@ class RegisterConnectionTask : public ReactorTask {
 
 void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr& remote) {
   VLOG(3) << name_ << ": new inbound connection to " << remote.ToString();
-  std::unique_ptr<Socket> new_socket(new Socket(socket->Release()));
+  unique_ptr<Socket> new_socket(new Socket(socket->Release()));
   auto task = new RegisterConnectionTask(
       new Connection(&thread_, remote, std::move(new_socket), Connection::SERVER));
   ScheduleReactorTask(task);
@@ -666,8 +718,7 @@ class AssignOutboundCallTask : public ReactorTask {
 void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall>& call) {
   DVLOG(3) << name_ << ": queueing outbound call "
            << call->ToString() << " to remote " << call->conn_id().remote().ToString();
-  AssignOutboundCallTask *task = new AssignOutboundCallTask(call);
-  ScheduleReactorTask(task);
+  ScheduleReactorTask(new AssignOutboundCallTask(call));
 }
 
 void Reactor::ScheduleReactorTask(ReactorTask *task) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index f31f69d..615cfc8 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -27,6 +27,7 @@
 
 #include <boost/function.hpp>
 #include <boost/intrusive/list.hpp>
+#include <boost/optional.hpp>
 #include <ev++.h>
 
 #include "kudu/gutil/ref_counted.h"
@@ -51,6 +52,7 @@ class DumpRunningRpcsResponsePB;
 class Messenger;
 class MessengerBuilder;
 class Reactor;
+enum class CredentialsPolicy;
 
 // Simple metrics information from within a reactor.
 struct ReactorMetrics {
@@ -131,9 +133,11 @@ class ReactorThread {
  public:
   friend class Connection;
 
-  // Client-side connection map.
-  typedef std::unordered_map<ConnectionId, scoped_refptr<Connection>,
-                             ConnectionIdHash, ConnectionIdEqual> conn_map_t;
+  // Client-side connection map. Multiple connections could be open to a remote
+  // server if multiple credential policies are used for individual RPCs.
+  typedef std::unordered_multimap<ConnectionId, scoped_refptr<Connection>,
+                                  ConnectionIdHash, ConnectionIdEqual>
+      conn_multimap_t;
 
   ReactorThread(Reactor *reactor, const MessengerBuilder &bld);
 
@@ -200,6 +204,7 @@ class ReactorThread {
   // May return a bad Status if the connect() call fails.
   // The resulting connection object is managed internally by the reactor thread.
   Status FindOrStartConnection(const ConnectionId& conn_id,
+                               CredentialsPolicy cred_policy,
                                scoped_refptr<Connection>* conn);
 
   // Shut down the given connection, removing it from the connection tracking
@@ -255,7 +260,7 @@ class ReactorThread {
   MonoTime last_unused_tcp_scan_;
 
   // Map of sockaddrs to Connection objects for outbound (client) connections.
-  conn_map_t client_conns_;
+  conn_multimap_t client_conns_;
 
   // List of current connections coming into the server.
   conn_list_t server_conns_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 1652992..6b97b4b 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -19,7 +19,6 @@
 
 #include <algorithm>
 #include <atomic>
-#include <list>
 #include <memory>
 #include <string>
 
@@ -31,6 +30,7 @@
 #include "kudu/rpc/remote_method.h"
 #include "kudu/rpc/result_tracker.h"
 #include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/rtest.pb.h"
 #include "kudu/rpc/rtest.proxy.h"
@@ -50,10 +50,10 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/trace.h"
 
-namespace kudu { namespace rpc {
+namespace kudu {
+namespace rpc {
 
 using kudu::rpc_test::AddRequestPB;
-using kudu::rpc_test::AddRequestPartialPB;
 using kudu::rpc_test::AddResponsePB;
 using kudu::rpc_test::CalculatorError;
 using kudu::rpc_test::CalculatorServiceIf;
@@ -419,13 +419,15 @@ class RpcTestBase : public KuduTest {
     return messenger;
   }
 
-  Status DoTestSyncCall(const Proxy &p, const char *method) {
+  Status DoTestSyncCall(const Proxy &p, const char *method,
+                        CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS) {
     AddRequestPB req;
     req.set_x(rand());
     req.set_y(rand());
     AddResponsePB resp;
     RpcController controller;
     controller.set_timeout(MonoDelta::FromMilliseconds(10000));
+    controller.set_credentials_policy(policy);
     RETURN_NOT_OK(p.SyncRequest(method, req, &resp, &controller));
 
     CHECK_EQ(req.x() + req.y(), resp.result());

http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index aca9324..643e61f 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -291,6 +291,78 @@ TEST_P(TestRpc, TestReopenOutboundConnections) {
   }
 }
 
+// Test that an outbound connection is closed and a new one is open if going
+// from ANY_CREDENTIALS to PRIMARY_CREDENTIALS policy for RPC calls to the same
+// destination.
+// Test that changing from PRIMARY_CREDENTIALS policy to ANY_CREDENTIALS policy
+// re-uses the connection established with PRIMARY_CREDENTIALS policy.
+TEST_P(TestRpc, TestCredentialsPolicy) {
+  // Only run one reactor per messenger, so we can grab the metrics from that
+  // one without having to check all.
+  n_server_reactor_threads_ = 1;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+
+  // Verify the initial counters.
+  ReactorMetrics metrics;
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+
+  // Make an RPC call with ANY_CREDENTIALS policy.
+  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  EXPECT_EQ(0, metrics.total_client_connections_);
+  EXPECT_EQ(1, metrics.total_server_connections_);
+  EXPECT_EQ(1, metrics.num_server_connections_);
+  EXPECT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  EXPECT_EQ(1, metrics.total_client_connections_);
+  EXPECT_EQ(0, metrics.total_server_connections_);
+  EXPECT_EQ(1, metrics.num_client_connections_);
+
+  // This is to allow all the data to be sent so the connection becomes idle.
+  SleepFor(MonoDelta::FromMilliseconds(5));
+
+  // Make an RPC call with PRIMARY_CREDENTIALS policy. Currently open connection
+  // with ANY_CREDENTIALS policy should be closed and a new one established
+  // with PRIMARY_CREDENTIALS policy.
+  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName,
+                           CredentialsPolicy::PRIMARY_CREDENTIALS));
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  EXPECT_EQ(0, metrics.total_client_connections_);
+  EXPECT_EQ(2, metrics.total_server_connections_);
+  EXPECT_EQ(1, metrics.num_server_connections_);
+  EXPECT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  EXPECT_EQ(2, metrics.total_client_connections_);
+  EXPECT_EQ(0, metrics.total_server_connections_);
+  EXPECT_EQ(1, metrics.num_client_connections_);
+
+  // Make another RPC call with ANY_CREDENTIALS policy. The already established
+  // connection with PRIMARY_CREDENTIALS policy should be re-used because
+  // the ANY_CREDENTIALS policy satisfies the PRIMARY_CREDENTIALS policy which
+  // the currently open connection has been established with.
+  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  EXPECT_EQ(0, metrics.total_client_connections_);
+  EXPECT_EQ(2, metrics.total_server_connections_);
+  EXPECT_EQ(1, metrics.num_server_connections_);
+  EXPECT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  EXPECT_EQ(2, metrics.total_client_connections_);
+  EXPECT_EQ(0, metrics.total_server_connections_);
+  EXPECT_EQ(1, metrics.num_client_connections_);
+}
+
 // Test that a call which takes longer than the keepalive time
 // succeeds -- i.e that we don't consider a connection to be "idle" on the
 // server if there is a call outstanding on it.

http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index 5e5cbc3..9120b72 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -18,18 +18,21 @@
 #include "kudu/rpc/rpc_controller.h"
 
 #include <algorithm>
-#include <glog/logging.h>
 #include <memory>
 #include <mutex>
 
+#include <glog/logging.h>
+
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/outbound_call.h"
 
 using std::unique_ptr;
 
-namespace kudu { namespace rpc {
+namespace kudu {
+namespace rpc {
 
-RpcController::RpcController() {
+RpcController::RpcController()
+    : credentials_policy_(CredentialsPolicy::ANY_CREDENTIALS) {
   DVLOG(4) << "RpcController " << this << " constructed";
 }
 
@@ -48,6 +51,7 @@ void RpcController::Swap(RpcController* other) {
 
   std::swap(outbound_sidecars_, other->outbound_sidecars_);
   std::swap(timeout_, other->timeout_);
+  std::swap(credentials_policy_, other->credentials_policy_);
   std::swap(call_, other->call_);
 }
 
@@ -58,6 +62,7 @@ void RpcController::Reset() {
   }
   call_.reset();
   required_server_features_.clear();
+  credentials_policy_ = CredentialsPolicy::ANY_CREDENTIALS;
 }
 
 bool RpcController::finished() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.h b/src/kudu/rpc/rpc_controller.h
index 6d521d0..db714bf 100644
--- a/src/kudu/rpc/rpc_controller.h
+++ b/src/kudu/rpc/rpc_controller.h
@@ -17,12 +17,12 @@
 #ifndef KUDU_RPC_RPC_CONTROLLER_H
 #define KUDU_RPC_RPC_CONTROLLER_H
 
-#include <functional>
-#include <glog/logging.h>
 #include <memory>
 #include <unordered_set>
 #include <vector>
 
+#include <glog/logging.h>
+
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/util/locks.h"
@@ -44,6 +44,24 @@ class OutboundCall;
 class RequestIdPB;
 class RpcSidecar;
 
+// Authentication credentials policy for outbound RPCs. Some RPC methods
+// (e.g. MasterService::ConnectToMaster) behave differently depending on the
+// type of credentials used for authentication when establishing the connection.
+// The client expecting some particular results from the call should specify
+// the required policy on a per-call basis using RpcController. By default,
+// RpcController uses ANY_CREDENTIALS.
+enum class CredentialsPolicy {
+  // It's acceptable to use authentication credentials of any type, primary or
+  // secondary ones.
+  ANY_CREDENTIALS,
+
+  // Only primary credentials are acceptable. Primary credentials are Kerberos
+  // tickets, TLS certificate. Secondary credentials are authentication tokens:
+  // they are 'derived' in the sense that it's possible to acquire them using
+  // 'primary' credentials.
+  PRIMARY_CREDENTIALS,
+};
+
 // Controller for managing properties of a single RPC call, on the client side.
 //
 // An RpcController maps to exactly one call and is not thread-safe. The client
@@ -115,7 +133,7 @@ class RpcController {
   // Using an uninitialized deadline means the call won't time out.
   void set_deadline(const MonoTime& deadline);
 
-  // Allows settting the request id for the next request sent to the server.
+  // Allows setting the request id for the next request sent to the server.
   // A request id allows the server to identify each request sent by the client uniquely,
   // in some cases even when sent to multiple servers, enabling exactly once semantics.
   void SetRequestIdPB(std::unique_ptr<RequestIdPB> request_id);
@@ -180,6 +198,14 @@ class RpcController {
   // Return the configured timeout.
   MonoDelta timeout() const;
 
+  CredentialsPolicy credentials_policy() const {
+    return credentials_policy_;
+  }
+
+  void set_credentials_policy(CredentialsPolicy policy) {
+    credentials_policy_ = policy;
+  }
+
   // Fills the 'sidecar' parameter with the slice pointing to the i-th
   // sidecar upon success.
   //
@@ -205,10 +231,13 @@ class RpcController {
   MonoDelta timeout_;
   std::unordered_set<uint32_t> required_server_features_;
 
+  // RPC authentication policy for outbound calls.
+  CredentialsPolicy credentials_policy_;
+
   mutable simple_spinlock lock_;
 
   // The id of this request.
-  // Ownership is transfered to OutboundCall once the call is sent.
+  // Ownership is transferred to OutboundCall once the call is sent.
   std::unique_ptr<RequestIdPB> request_id_;
 
   // Once the call is sent, it is tracked here.

http://git-wip-us.apache.org/repos/asf/kudu/blob/58f1a216/src/kudu/rpc/rpc_stub-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_stub-test.cc b/src/kudu/rpc/rpc_stub-test.cc
index 4070600..2fe0708 100644
--- a/src/kudu/rpc/rpc_stub-test.cc
+++ b/src/kudu/rpc/rpc_stub-test.cc
@@ -235,7 +235,7 @@ TEST_F(RpcStubTest, TestRemoteAddress) {
 TEST_F(RpcStubTest, TestCallWithInvalidParam) {
   Proxy p(client_messenger_, server_addr_, CalculatorService::static_service_name());
 
-  AddRequestPartialPB req;
+  rpc_test::AddRequestPartialPB req;
   req.set_x(rand());
   // AddRequestPartialPB is missing the 'y' field.
   AddResponsePB resp;


Mime
View raw message