Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8BA74200BB3 for ; Wed, 2 Nov 2016 22:39:47 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8A36C160AF0; Wed, 2 Nov 2016 21:39:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 67665160B0D for ; Wed, 2 Nov 2016 22:39:45 +0100 (CET) Received: (qmail 89499 invoked by uid 500); 2 Nov 2016 21:39:44 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 89339 invoked by uid 99); 2 Nov 2016 21:39:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Nov 2016 21:39:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 071FCE04EE; Wed, 2 Nov 2016 21:39:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: todd@apache.org To: commits@kudu.apache.org Date: Wed, 02 Nov 2016 21:39:48 -0000 Message-Id: <3893d8bebe50470e9f4fea839941c5d1@git.apache.org> In-Reply-To: <86bd5cd21af645cab89e0320c1a46194@git.apache.org> References: <86bd5cd21af645cab89e0320c1a46194@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/6] kudu git commit: KuduRPC integration with OpenSSL archived-at: Wed, 02 Nov 2016 21:39:47 -0000 KuduRPC integration with OpenSSL This patch adds SSL support for the RPC layer in Kudu. It uses the OpenSSL library for this purpose. This is achieved by subclassing 'Socket' to add 'SSLSocket' which calls into the OpenSSL library for auth/read/write/shutdown. SSL is enabled only at a 'Connection' object level, i.e. the AcceptorPool still only works with regular 'Socket's, and the reactor threads also use a regular 'Socket' on a new outgoing call. The first point of any SSL activity happens in the context of the negotiation pool, where the SSL handshake happens before the SASL handshake. On a successful handshake, further communication is encrypted on that 'Connection'. A 'SSLFactory' is created at the 'Messenger' level. This factory is in-charge of creating 'SSLSocket' objects when necessary and also keeps a track of the certificates, keys and the SSL context that is shared among all the sockets of that 'Messenger'. The x509_check_host() function is ported from OpenSSL-1.1.0b which we use when the version of OpenSSL detected is less than 1.0.2. It is added in util/x509_check_host.cc/h Added a parameter to the rpc-test to additionally run the tests with SSL enabled. Future TODOs: - Allow loading keys as strings vs files. (Need to use different APIs) - Consider porting x509_check_ip and x509_check_ip_ascii. Change-Id: I27167faa4e6a78e59b46093055b16682c93af0ea Reviewed-on: http://gerrit.cloudera.org:8080/4789 Reviewed-by: Sailesh Mukil Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin Reviewed-by: Todd Lipcon Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bd436125 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bd436125 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bd436125 Branch: refs/heads/master Commit: bd4361254d5c360fc103e6d2a4433c6e369cf49b Parents: d5d9afb Author: Sailesh Mukil Authored: Thu Oct 20 14:22:14 2016 -0700 Committer: Todd Lipcon Committed: Wed Nov 2 21:39:13 2016 +0000 ---------------------------------------------------------------------- CMakeLists.txt | 7 + LICENSE.txt | 125 +++++++++ NOTICE.txt | 7 + build-support/lint.sh | 4 +- src/kudu/rpc/connection.cc | 28 +- src/kudu/rpc/connection.h | 13 +- src/kudu/rpc/messenger.cc | 25 ++ src/kudu/rpc/messenger.h | 9 + src/kudu/rpc/negotiation.cc | 2 + src/kudu/rpc/reactor.cc | 21 +- src/kudu/rpc/rpc-test-base.h | 99 ++++++- src/kudu/rpc/rpc-test.cc | 112 ++++---- src/kudu/rpc/sasl_client.cc | 14 +- src/kudu/rpc/sasl_client.h | 5 +- src/kudu/rpc/sasl_rpc-test.cc | 28 +- src/kudu/rpc/sasl_server.cc | 18 +- src/kudu/rpc/sasl_server.h | 7 +- src/kudu/util/CMakeLists.txt | 11 + src/kudu/util/net/net_util-test.cc | 1 + src/kudu/util/net/socket.cc | 13 + src/kudu/util/net/socket.h | 17 +- src/kudu/util/net/ssl_factory.cc | 141 ++++++++++ src/kudu/util/net/ssl_factory.h | 66 +++++ src/kudu/util/net/ssl_socket.cc | 173 +++++++++++++ src/kudu/util/net/ssl_socket.h | 60 +++++ src/kudu/util/x509_check_host.cc | 441 ++++++++++++++++++++++++++++++++ src/kudu/util/x509_check_host.h | 48 ++++ 27 files changed, 1373 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 2d3391e..1d44f29 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -830,6 +830,13 @@ include_directories(SYSTEM ${SQUEASEL_INCLUDE_DIR}) ADD_THIRDPARTY_LIB(squeasel STATIC_LIB "${SQUEASEL_STATIC_LIB}") +find_package(OpenSSL REQUIRED) +include_directories(${OPENSSL_INCLUDE_DIR}) +ADD_THIRDPARTY_LIB(openssl_ssl + SHARED_LIB "${OPENSSL_SSL_LIBRARY}") +ADD_THIRDPARTY_LIB(openssl_crypto + SHARED_LIB "${OPENSSL_CRYPTO_LIBRARY}") + ## Google PerfTools ## ## Disabled with TSAN/ASAN as well as with gold+dynamic linking (see comment http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/LICENSE.txt ---------------------------------------------------------------------- diff --git a/LICENSE.txt b/LICENSE.txt index a732ad1..afcb22d 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -476,6 +476,131 @@ under the following 3-clause BSD license: ================================================================================ +src/kudu/util/x509_check_host.*: OpenSSL software license: + +LICENSE ISSUES + ============== + + The OpenSSL toolkit stays under a dual license, i.e. both the conditions of + the OpenSSL License and the original SSLeay license apply to the toolkit. + See below for the actual license texts. + + OpenSSL License + --------------- + ==================================================================== + Copyright (c) 1998-2016 The OpenSSL Project. All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the + distribution. + + 3. All advertising materials mentioning features or use of this + software must display the following acknowledgment: + "This product includes software developed by the OpenSSL Project + for use in the OpenSSL Toolkit. (http://www.openssl.org/)" + + 4. The names "OpenSSL Toolkit" and "OpenSSL Project" must not be used to + endorse or promote products derived from this software without + prior written permission. For written permission, please contact + openssl-core@openssl.org. + + 5. Products derived from this software may not be called "OpenSSL" + nor may "OpenSSL" appear in their names without prior written + permission of the OpenSSL Project. + + 6. Redistributions of any form whatsoever must retain the following + acknowledgment: + "This product includes software developed by the OpenSSL Project + for use in the OpenSSL Toolkit (http://www.openssl.org/)" + + THIS SOFTWARE IS PROVIDED BY THE OpenSSL PROJECT ``AS IS'' AND ANY + EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE OpenSSL PROJECT OR + ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + OF THE POSSIBILITY OF SUCH DAMAGE. + ==================================================================== + + This product includes cryptographic software written by Eric Young + (eay@cryptsoft.com). This product includes software written by Tim + Hudson (tjh@cryptsoft.com). + + + Original SSLeay License + ----------------------- + + Copyright (C) 1995-1998 Eric Young (eay@cryptsoft.com) + All rights reserved. + + This package is an SSL implementation written + by Eric Young (eay@cryptsoft.com). + The implementation was written so as to conform with Netscapes SSL. + + This library is free for commercial and non-commercial use as long as + the following conditions are aheared to. The following conditions + apply to all code found in this distribution, be it the RC4, RSA, + lhash, DES, etc., code; not just the SSL code. The SSL documentation + included with this distribution is covered by the same copyright terms + except that the holder is Tim Hudson (tjh@cryptsoft.com). + + Copyright remains Eric Young's, and as such any Copyright notices in + the code are not to be removed. + If this package is used in a product, Eric Young should be given attribution + as the author of the parts of the library used. + This can be in the form of a textual message at program startup or + in documentation (online or textual) provided with the package. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + 1. Redistributions of source code must retain the copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. All advertising materials mentioning features or use of this software + must display the following acknowledgement: + "This product includes cryptographic software written by + Eric Young (eay@cryptsoft.com)" + The word 'cryptographic' can be left out if the rouines from the library + being used are not cryptographic related :-). + 4. If you include any Windows specific code (or a derivative thereof) from + the apps directory (application code) you must include an acknowledgement: + "This product includes software written by Tim Hudson (tjh@cryptsoft.com)" + + THIS SOFTWARE IS PROVIDED BY ERIC YOUNG ``AS IS'' AND + ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + SUCH DAMAGE. + + The licence and distribution terms for any publically available version or + derivative of this code cannot be changed. i.e. this code cannot simply be + copied and put under another distribution licence + [including the GNU Public Licence.] + +================================================================================ + The following dependencies or pieces of incorporated source code have licenses under one of the following categories: http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/NOTICE.txt ---------------------------------------------------------------------- diff --git a/NOTICE.txt b/NOTICE.txt index f41d702..b092a1f 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -6,3 +6,10 @@ The Apache Software Foundation (http://www.apache.org/). Portions of this software were developed at Cloudera, Inc (http://www.cloudera.com/). + +This product includes software developed by the OpenSSL +Project for use in the OpenSSL Toolkit (http://www.openssl.org/) + +This product includes cryptographic software written by Eric Young +(eay@cryptsoft.com). This product includes software written by Tim +Hudson (tjh@cryptsoft.com). http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/build-support/lint.sh ---------------------------------------------------------------------- diff --git a/build-support/lint.sh b/build-support/lint.sh index 6cf43cc..61279bc 100755 --- a/build-support/lint.sh +++ b/build-support/lint.sh @@ -38,13 +38,13 @@ done if $ONLY_CHANGED; then FILES=$(git diff --name-only $($ROOT/build-support/get-upstream-commit.sh) \ - | egrep '\.(cc|h)$' | grep -v "gutil\|trace_event") + | egrep '\.(cc|h)$' | grep -v "gutil\|trace_event\|x509_check_host") if [ -z "$FILES" ]; then echo No source files changed exit 0 fi else - FILES=$(find $ROOT/src -name '*.cc' -or -name '*.h' | grep -v "\.pb\.\|\.service\.\|\.proxy\.\|\.krpc\.\|gutil\|trace_event\|kudu_export\.h") + FILES=$(find $ROOT/src -name '*.cc' -or -name '*.h' | grep -v "\.pb\.\|\.service\.\|\.proxy\.\|\.krpc\.\|gutil\|trace_event\|kudu_export\.h\|x509_check_host") fi cd $ROOT http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/connection.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc index e074ec0..d5c3080 100644 --- a/src/kudu/rpc/connection.cc +++ b/src/kudu/rpc/connection.cc @@ -43,6 +43,8 @@ #include "kudu/util/flag_tags.h" #include "kudu/util/logging.h" #include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/ssl_factory.h" +#include "kudu/util/net/ssl_socket.h" #include "kudu/util/status.h" #include "kudu/util/trace.h" @@ -62,33 +64,34 @@ namespace rpc { /// Connection /// Connection::Connection(ReactorThread *reactor_thread, Sockaddr remote, - int socket, Direction direction) + Socket* socket, Direction direction) : reactor_thread_(reactor_thread), - socket_(socket), remote_(std::move(remote)), + socket_(socket), direction_(direction), last_activity_time_(MonoTime::Now()), is_epoll_registered_(false), next_call_id_(1), sasl_client_(kSaslAppName, socket), sasl_server_(kSaslAppName, socket), - negotiation_complete_(false) {} + negotiation_complete_(false) { +} Status Connection::SetNonBlocking(bool enabled) { - return socket_.SetNonBlocking(enabled); + return socket_->SetNonBlocking(enabled); } void Connection::EpollRegister(ev::loop_ref& loop) { DCHECK(reactor_thread_->IsCurrentThread()); DVLOG(4) << "Registering connection for epoll: " << ToString(); write_io_.set(loop); - write_io_.set(socket_.GetFd(), ev::WRITE); + write_io_.set(socket_->GetFd(), ev::WRITE); write_io_.set(this); if (direction_ == CLIENT && negotiation_complete_) { write_io_.start(); } read_io_.set(loop); - read_io_.set(socket_.GetFd(), ev::READ); + read_io_.set(socket_->GetFd(), ev::READ); read_io_.set(this); read_io_.start(); is_epoll_registered_ = true; @@ -167,7 +170,7 @@ void Connection::Shutdown(const Status &status) { read_io_.stop(); write_io_.stop(); is_epoll_registered_ = false; - WARN_NOT_OK(socket_.Close(), "Error closing socket"); + WARN_NOT_OK(socket_->Close(), "Error closing socket"); } void Connection::QueueOutbound(gscoped_ptr transfer) { @@ -453,7 +456,7 @@ void Connection::ReadHandler(ev::io &watcher, int revents) { if (!inbound_) { inbound_.reset(new InboundTransfer()); } - Status status = inbound_->ReceiveBuffer(socket_); + Status status = inbound_->ReceiveBuffer(*socket_); if (PREDICT_FALSE(!status.ok())) { if (status.posix_code() == ESHUTDOWN) { VLOG(1) << ToString() << " shut down by remote end."; @@ -591,7 +594,7 @@ void Connection::WriteHandler(ev::io &watcher, int revents) { } last_activity_time_ = reactor_thread_->cur_time(); - Status status = transfer->SendBuffer(socket_); + Status status = transfer->SendBuffer(*socket_); if (PREDICT_FALSE(!status.ok())) { LOG(WARNING) << ToString() << " send error: " << status.ToString(); reactor_thread_->DestroyConnection(this, status); @@ -622,6 +625,13 @@ std::string Connection::ToString() const { remote_.ToString()); } +Status Connection::InitSSLIfNecessary() { + if (!reactor_thread_->reactor()->messenger()->ssl_enabled()) return Status::OK(); + SSLSocket* ssl_socket = down_cast(socket_.get()); + RETURN_NOT_OK(ssl_socket->DoHandshake()); + return Status::OK(); +} + Status Connection::InitSaslClient() { // Note that remote_.host() is an IP address here: we've already lost // whatever DNS name the client was attempting to use. Unless krb5 http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/connection.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h index d12a168..bb2aa94 100644 --- a/src/kudu/rpc/connection.h +++ b/src/kudu/rpc/connection.h @@ -80,7 +80,7 @@ class Connection : public RefCountedThreadSafe { // remote: the address of the remote end // socket: the socket to take ownership of. // direction: whether we are the client or server side - Connection(ReactorThread *reactor_thread, Sockaddr remote, int socket, + Connection(ReactorThread *reactor_thread, Sockaddr remote, Socket* socket, Direction direction); // Set underlying socket to non-blocking (or blocking) mode. @@ -141,7 +141,7 @@ class Connection : public RefCountedThreadSafe { Direction direction() const { return direction_; } - Socket *socket() { return &socket_; } + Socket* socket() { return socket_.get(); } // Return SASL client instance for this connection. SaslClient &sasl_client() { return sasl_client_; } @@ -149,6 +149,9 @@ class Connection : public RefCountedThreadSafe { // Return SASL server instance for this connection. SaslServer &sasl_server() { return sasl_server_; } + // Initialize underlying SSLSocket if SSL is enabled. + Status InitSSLIfNecessary(); + // Initialize SASL client before negotiation begins. Status InitSaslClient(); @@ -225,12 +228,12 @@ class Connection : public RefCountedThreadSafe { // The reactor thread that created this connection. ReactorThread * const reactor_thread_; - // The socket we're communicating on. - Socket socket_; - // The remote address we're talking to. const Sockaddr remote_; + // The socket we're communicating on. + std::unique_ptr socket_; + // The credentials of the user operating on this connection (if a client user). UserCredentials user_credentials_; http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/messenger.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc index 4f1f0e3..437add7 100644 --- a/src/kudu/rpc/messenger.cc +++ b/src/kudu/rpc/messenger.cc @@ -47,6 +47,7 @@ #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/net/socket.h" +#include "kudu/util/net/ssl_factory.h" #include "kudu/util/status.h" #include "kudu/util/threadpool.h" #include "kudu/util/trace.h" @@ -55,9 +56,23 @@ using std::string; using std::shared_ptr; using strings::Substitute; + +DEFINE_string(rpc_ssl_server_certificate, "", "Path to the SSL certificate to be used for the RPC " + "layer."); +DEFINE_string(rpc_ssl_private_key, "", + "Path to the private key to be used to complement the public key present in " + "--ssl_server_certificate"); +DEFINE_string(rpc_ssl_certificate_authority, "", + "Path to the certificate authority to be used by the client side of the connection to verify " + "the validity of the certificate presented by the server."); + DEFINE_int32(rpc_default_keepalive_time_ms, 65000, "If an RPC connection from a client is idle for this amount of time, the server " "will disconnect the client."); + +TAG_FLAG(rpc_ssl_server_certificate, experimental); +TAG_FLAG(rpc_ssl_private_key, experimental); +TAG_FLAG(rpc_ssl_certificate_authority, experimental); TAG_FLAG(rpc_default_keepalive_time_ms, advanced); DEFINE_bool(server_require_kerberos, false, @@ -162,6 +177,7 @@ void Messenger::Shutdown() { for (Reactor* reactor : reactors_) { reactor->Shutdown(); } + ssl_factory_.reset(); } Status Messenger::AddAcceptorPool(const Sockaddr &accept_addr, @@ -276,6 +292,15 @@ Reactor* Messenger::RemoteToReactor(const Sockaddr &remote) { Status Messenger::Init() { Status status; + ssl_enabled_ = !FLAGS_rpc_ssl_server_certificate.empty() || !FLAGS_rpc_ssl_private_key.empty() + || !FLAGS_rpc_ssl_certificate_authority.empty(); + if (ssl_enabled_) { + ssl_factory_.reset(new SSLFactory()); + RETURN_NOT_OK(ssl_factory_->Init()); + RETURN_NOT_OK(ssl_factory_->LoadCertificate(FLAGS_rpc_ssl_server_certificate)); + RETURN_NOT_OK(ssl_factory_->LoadPrivateKey(FLAGS_rpc_ssl_private_key)); + RETURN_NOT_OK(ssl_factory_->LoadCertificateAuthority(FLAGS_rpc_ssl_certificate_authority)); + } for (Reactor* r : reactors_) { RETURN_NOT_OK(r->Init()); } http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/messenger.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h index a7ac1ae..09c8b53 100644 --- a/src/kudu/rpc/messenger.h +++ b/src/kudu/rpc/messenger.h @@ -39,6 +39,7 @@ namespace kudu { class Socket; +class SSLFactory; class ThreadPool; namespace rpc { @@ -176,6 +177,8 @@ class Messenger { void ScheduleOnReactor(const boost::function& func, MonoDelta when); + SSLFactory* ssl_factory() const { return ssl_factory_.get(); } + ThreadPool* negotiation_pool() const { return negotiation_pool_.get(); } RpczStore* rpcz_store() { return rpcz_store_.get(); } @@ -186,6 +189,8 @@ class Messenger { return name_; } + bool ssl_enabled() const { return ssl_enabled_; } + bool closing() const { shared_lock l(lock_.get_lock()); return closing_; @@ -216,6 +221,8 @@ class Messenger { bool closing_; + bool ssl_enabled_; + // Pools which are listening on behalf of this messenger. // Note that the user may have called Shutdown() on one of these // pools, so even though we retain the reference, it may no longer @@ -229,6 +236,8 @@ class Messenger { gscoped_ptr negotiation_pool_; + gscoped_ptr ssl_factory_; + std::unique_ptr rpcz_store_; scoped_refptr metric_entity_; http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/negotiation.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc index be5b504..e42a884 100644 --- a/src/kudu/rpc/negotiation.cc +++ b/src/kudu/rpc/negotiation.cc @@ -184,6 +184,7 @@ static Status DoClientNegotiation(Connection* conn, RETURN_NOT_OK(WaitForClientConnect(conn, deadline)); RETURN_NOT_OK(conn->SetNonBlocking(false)); + RETURN_NOT_OK(conn->InitSSLIfNecessary()); conn->sasl_client().set_deadline(deadline); RETURN_NOT_OK(conn->sasl_client().Negotiate()); RETURN_NOT_OK(SendConnectionContext(conn, deadline)); @@ -201,6 +202,7 @@ static Status DoServerNegotiation(Connection* conn, SleepFor(MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_inject_delay_ms)); } RETURN_NOT_OK(conn->SetNonBlocking(false)); + RETURN_NOT_OK(conn->InitSSLIfNecessary()); RETURN_NOT_OK(conn->InitSaslServer()); conn->sasl_server().set_deadline(deadline); RETURN_NOT_OK(conn->sasl_server().Negotiate()); http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/reactor.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc index e8e0054..99aa2e6 100644 --- a/src/kudu/rpc/reactor.cc +++ b/src/kudu/rpc/reactor.cc @@ -44,12 +44,14 @@ #include "kudu/util/errno.h" #include "kudu/util/flag_tags.h" #include "kudu/util/monotime.h" +#include "kudu/util/status.h" #include "kudu/util/thread.h" #include "kudu/util/threadpool.h" #include "kudu/util/thread_restrictions.h" #include "kudu/util/trace.h" -#include "kudu/util/status.h" #include "kudu/util/net/socket.h" +#include "kudu/util/net/ssl_factory.h" +#include "kudu/util/net/ssl_socket.h" // When compiling on Mac OS X, use 'kqueue' instead of the default, 'select', for the event loop. // Otherwise we run into problems because 'select' can't handle connections when more than 1024 @@ -335,8 +337,15 @@ Status ReactorThread::FindOrStartConnection(const ConnectionId &conn_id, bool connect_in_progress; RETURN_NOT_OK(StartConnect(&sock, conn_id.remote(), &connect_in_progress)); + std::unique_ptr new_socket; + if (reactor()->messenger()->ssl_enabled()) { + new_socket = reactor()->messenger()->ssl_factory()->CreateSocket(sock.Release(), false); + } else { + new_socket.reset(new Socket(sock.Release())); + } + // Register the new connection in our map. - *conn = new Connection(this, conn_id.remote(), sock.Release(), Connection::CLIENT); + *conn = new Connection(this, conn_id.remote(), new_socket.release(), Connection::CLIENT); (*conn)->set_user_credentials(conn_id.user_credentials()); // Kick off blocking client connection negotiation. @@ -594,8 +603,14 @@ class RegisterConnectionTask : public ReactorTask { void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr &remote) { VLOG(3) << name_ << ": new inbound connection to " << remote.ToString(); + std::unique_ptr new_socket; + if (messenger()->ssl_enabled()) { + new_socket = messenger()->ssl_factory()->CreateSocket(socket->Release(), true); + } else { + new_socket.reset(new Socket(socket->Release())); + } scoped_refptr conn( - new Connection(&thread_, remote, socket->Release(), Connection::SERVER)); + new Connection(&thread_, remote, new_socket.release(), Connection::SERVER)); auto task = new RegisterConnectionTask(conn); ScheduleReactorTask(task); } http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/rpc-test-base.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h index 9aafe5d..d747fda 100644 --- a/src/kudu/rpc/rpc-test-base.h +++ b/src/kudu/rpc/rpc-test-base.h @@ -37,15 +37,21 @@ #include "kudu/rpc/rtest.service.h" #include "kudu/rpc/service_if.h" #include "kudu/rpc/service_pool.h" +#include "kudu/util/env.h" #include "kudu/util/faststring.h" #include "kudu/util/mem_tracker.h" #include "kudu/util/net/sockaddr.h" +#include "kudu/util/path_util.h" #include "kudu/util/random.h" #include "kudu/util/random_util.h" #include "kudu/util/stopwatch.h" #include "kudu/util/test_util.h" #include "kudu/util/trace.h" +DECLARE_string(rpc_ssl_server_certificate); +DECLARE_string(rpc_ssl_private_key); +DECLARE_string(rpc_ssl_certificate_authority); + namespace kudu { namespace rpc { using kudu::rpc_test::AddRequestPB; @@ -310,6 +316,75 @@ const char *GenericCalculatorService::kFirstString = const char *GenericCalculatorService::kSecondString = "2222222222222222222222222222222222222222222222222222222222222222222222"; +// Writes the test SSL certificate into a temporary file. +inline Status CreateSSLServerCert(const std::string& file_path) { + static const char* test_server_cert = R"( +-----BEGIN CERTIFICATE----- +MIIEejCCA2KgAwIBAgIJAKMdvDR5PL82MA0GCSqGSIb3DQEBBQUAMIGEMQswCQYD +VQQGEwJVUzETMBEGA1UECBMKQ2FsaWZvcm5pYTEWMBQGA1UEBxMNU2FuIEZyYW5j +aXNjbzERMA8GA1UEChMIQ2xvdWRlcmExEjAQBgNVBAMTCWxvY2FsaG9zdDEhMB8G +CSqGSIb3DQEJARYSaGVucnlAY2xvdWRlcmEuY29tMB4XDTEzMDkyMjAwMjUxOFoX +DTQxMDIwNzAwMjUxOFowgYQxCzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxpZm9y +bmlhMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2NvMREwDwYDVQQKEwhDbG91ZGVyYTES +MBAGA1UEAxMJbG9jYWxob3N0MSEwHwYJKoZIhvcNAQkBFhJoZW5yeUBjbG91ZGVy +YS5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCoUj3pMQ2ELkrz +zq+koixljVFBAEEqwUWSjA+GEKwfFb/UPRjeO/wrKndp2r83jc6KRt66rvAIl8cr +b54yTOsJ/ZcARrjTwG3IG8Tely/54ZQyH0ImdJyEbCSoI04zX3ovjlppz3g5xanj +WmpAh6pzPgBOTfisCLMPD70xQ8F//QWZdNatoly54STkTWoJv/Oll/UpXcBY8JOR ++ytX82eGgG4F8YoQqmbjrrN5JAmqLRiBAkr3WUena6ekqJBalJRzex/Wh8a9XEV7 +9HFVVngBhezsOJgf81hzBzzhULKfxuXl8uaUj3Z9cZg39CDsyz+ULYbsPm8VoMUI +VCf7MUVTAgMBAAGjgewwgekwHQYDVR0OBBYEFK94kea7jIKQawAIb+0DqsA1rf6n +MIG5BgNVHSMEgbEwga6AFK94kea7jIKQawAIb+0DqsA1rf6noYGKpIGHMIGEMQsw +CQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZvcm5pYTEWMBQGA1UEBxMNU2FuIEZy +YW5jaXNjbzERMA8GA1UEChMIQ2xvdWRlcmExEjAQBgNVBAMTCWxvY2FsaG9zdDEh +MB8GCSqGSIb3DQEJARYSaGVucnlAY2xvdWRlcmEuY29tggkAox28NHk8vzYwDAYD +VR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOCAQEAEtkPPncCnN2IFVJvz04K+VsX +b6w3qwPynQKc67+++JkNb3TYKrh/0UVM1NrEOu3TGplqOrKgAlITuaWNqNOSBu1R +WJtrz85YkonED5awjjuALVEY82+c7pOXkuv5G5421RINfRn2hNzgw8VFb5CEvxHH +jER80Vx6UGKr/S649qTQ8AzVzTwWS86VsGI2azAD7D67G/IDGf+0P7FsXonKY+vl +vKzkfaO1+qEOLtDHV9mwlsxl3Re/MNym4ExWHi9txynCNiRZHqWoZUS+KyYqIR2q +seCrQwgi1Fer9Ekd5XNjWjigC3VC3SjMqWaxeKbZ2/AuABJMz5xSiRkgwphXEQ== +-----END CERTIFICATE----- + )"; + RETURN_NOT_OK(WriteStringToFile(Env::Default(), test_server_cert, file_path)); + return Status::OK(); +} + +// Writes the test SSL private key into a temporary file. +inline Status CreateSSLPrivateKey(const std::string& file_path) { + static const char* test_private_key = R"( +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAqFI96TENhC5K886vpKIsZY1RQQBBKsFFkowPhhCsHxW/1D0Y +3jv8Kyp3adq/N43Oikbeuq7wCJfHK2+eMkzrCf2XAEa408BtyBvE3pcv+eGUMh9C +JnSchGwkqCNOM196L45aac94OcWp41pqQIeqcz4ATk34rAizDw+9MUPBf/0FmXTW +raJcueEk5E1qCb/zpZf1KV3AWPCTkfsrV/NnhoBuBfGKEKpm466zeSQJqi0YgQJK +91lHp2unpKiQWpSUc3sf1ofGvVxFe/RxVVZ4AYXs7DiYH/NYcwc84VCyn8bl5fLm +lI92fXGYN/Qg7Ms/lC2G7D5vFaDFCFQn+zFFUwIDAQABAoIBABNTpiIxbLDhs998 +uvQ3XsumR08kXVcwa/GgvWOSZIEJOUaAYWubDaBTNvTjlhMl6DI+YvKihZMnAkp9 +fXefF1nFUWJJvI0ryi8w6RD54RtbCG4c4raRqysVU7wumZsSenAdc0o09UQE6zXc +uth/+1VSKCzVjRkLwquXDg0rD3vHfQHWQvQlzwUh3OACA3LfLezVqzrEB02YVRxm +xwg5veeMg6Aod8vsvsajry9eE0hKeFGonRANerL9lwZxzD2ZjU2fSEJYY3xxKVgi +o+QVTKaAt9pivOs10YVZDcIDH0xmDpxAkaLb5ZAbnjwhf7WGYgEm8VruAHkJxyXX +yPf3rpkCgYEA0dp/Xv5KBIkD6JJao8hnhtP5x9U7o/pTzRxaO3WUflvTI6DtC0nk +cTOwFVs4WljT76T937q2x4stuX0woHzvIaZ6wwZ2vv81ikDY6pE8eLWsH/xFAmkx +HBfkSijFgJV6EpTqUnFD7QKU89tzWrh/kxaMO1WgFaBhxPPs3K1LDTUCgYEAzVW5 +3yjfVHNgjWTeAbnbToGvUihOidvIvS5cVo5q0Dhfabz0tiXFxAoQUGErUGPC8Nu2 +n/HxTI3b0PbCCwjBsDNqX2kzxTSe5aTGIrBUWbped1bxme8jggXuWYbg8vvLpsYf +wAJPxnGIxW/v/aftHUhbTIuVfZX2+UnilrwiwWcCgYEAg8paz4eXaH277KVtMwq6 +qZwac/tgNz0Qv/GcYVcYaLq2QNvhwoMnakhxvxfIrkS25PuTTJxwCaVIlAMhNMkB +TPrGghBfJtgUAb1z/Ow1NAG0FWpS1I7HfsMqZcBxOK2nOmA3QItNg11pujQJn+Ha +jL9OVj0SCkLs48nk6ToTtjkCgYEAh8YCtNwq6IWuN3CWGCAUMpIwIqxCWof48Zch +OZ7MZEiSVrG6QmMxpRJefTfzUyHUOj2eQZ7SxqMa0c8IuhEdOeyVjudaczD7TLAq +z68252oDovfbo8Tr/sL7OzmjryfuHqXtQqKEq5xRKvR8hYavlGhO7otx2uv5thcz +/CYE+UsCgYAsgewfzbcBqJwsAUs98DK99EU8VqKuaYlU5wNvAVb27O6sVeysSokM +G1TGIXJPphA3dSAe4Pf/j4ff/eGaS20FAFhs4BPpw0fAeCHpmD0BjIba0lxBS/gY +dc+JVPKL8Fe4a8fmsI6ndcZQ9qpOdZM5WOD0ldKRc+SsrYKkTmOOJQ== +-----END RSA PRIVATE KEY----- + )"; + RETURN_NOT_OK(WriteStringToFile(Env::Default(), test_private_key, file_path)); + return Status::OK(); +} + class RpcTestBase : public KuduTest { public: RpcTestBase() @@ -337,7 +412,17 @@ class RpcTestBase : public KuduTest { protected: std::shared_ptr CreateMessenger(const string &name, - int n_reactors = 1) { + int n_reactors = 1, + bool enable_ssl = false) { + if (enable_ssl) { + std::string server_cert_path = JoinPathSegments(GetTestDataDirectory(), "server-cert.pem"); + std::string private_key_path = JoinPathSegments(GetTestDataDirectory(), "server-key.pem"); + CHECK_OK(CreateSSLServerCert(server_cert_path)); + CHECK_OK(CreateSSLPrivateKey(private_key_path)); + FLAGS_rpc_ssl_server_certificate = server_cert_path; + FLAGS_rpc_ssl_private_key = private_key_path; + FLAGS_rpc_ssl_certificate_authority = server_cert_path; + } MessengerBuilder bld(name); bld.set_num_reactors(n_reactors); bld.set_connection_keepalive_time( @@ -418,12 +503,12 @@ class RpcTestBase : public KuduTest { LOG(INFO) << "status: " << s.ToString() << ", seconds elapsed: " << sw.elapsed().wall_seconds(); } - void StartTestServer(Sockaddr *server_addr) { - DoStartTestServer(server_addr); + void StartTestServer(Sockaddr *server_addr, bool enable_ssl = false) { + DoStartTestServer(server_addr, enable_ssl); } - void StartTestServerWithGeneratedCode(Sockaddr *server_addr) { - DoStartTestServer(server_addr); + void StartTestServerWithGeneratedCode(Sockaddr *server_addr, bool enable_ssl = false) { + DoStartTestServer(server_addr, enable_ssl); } // Start a simple socket listening on a local port, returning the address. @@ -449,8 +534,8 @@ class RpcTestBase : public KuduTest { } template - void DoStartTestServer(Sockaddr *server_addr) { - server_messenger_ = CreateMessenger("TestServer", n_server_reactor_threads_); + void DoStartTestServer(Sockaddr *server_addr, bool enable_ssl = false) { + server_messenger_ = CreateMessenger("TestServer", n_server_reactor_threads_, enable_ssl); std::shared_ptr pool; ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr(), &pool)); ASSERT_OK(pool->Start(2)); http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/rpc-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc index 430cd7b..7d9840e 100644 --- a/src/kudu/rpc/rpc-test.cc +++ b/src/kudu/rpc/rpc-test.cc @@ -49,9 +49,12 @@ using std::vector; namespace kudu { namespace rpc { -class TestRpc : public RpcTestBase { +class TestRpc : public RpcTestBase, public ::testing::WithParamInterface { }; +// This is used to run all parameterized tests with and without SSL. +INSTANTIATE_TEST_CASE_P(OptionalSSL, TestRpc, testing::Values(false, true)); + TEST_F(TestRpc, TestSockaddr) { Sockaddr addr1, addr2; addr1.set_port(1000); @@ -67,8 +70,8 @@ TEST_F(TestRpc, TestSockaddr) { ASSERT_EQ(string("0.0.0.0:1000"), addr3.ToString()); } -TEST_F(TestRpc, TestMessengerCreateDestroy) { - shared_ptr messenger(CreateMessenger("TestCreateDestroy")); +TEST_P(TestRpc, TestMessengerCreateDestroy) { + shared_ptr messenger(CreateMessenger("TestCreateDestroy", 1, GetParam())); LOG(INFO) << "started messenger " << messenger->name(); messenger->Shutdown(); } @@ -77,10 +80,10 @@ TEST_F(TestRpc, TestMessengerCreateDestroy) { // test for a segfault seen in early versions of the RPC code, // in which shutting down the acceptor would trigger an assert, // making our tests flaky. -TEST_F(TestRpc, TestAcceptorPoolStartStop) { +TEST_P(TestRpc, TestAcceptorPoolStartStop) { int n_iters = AllowSlowTests() ? 100 : 5; for (int i = 0; i < n_iters; i++) { - shared_ptr messenger(CreateMessenger("TestAcceptorPoolStartStop")); + shared_ptr messenger(CreateMessenger("TestAcceptorPoolStartStop", 1, GetParam())); shared_ptr pool; ASSERT_OK(messenger->AddAcceptorPool(Sockaddr(), &pool)); Sockaddr bound_addr; @@ -100,14 +103,15 @@ TEST_F(TestRpc, TestConnHeaderValidation) { } // Test making successful RPC calls. -TEST_F(TestRpc, TestCall) { +TEST_P(TestRpc, TestCall) { // Set up server. Sockaddr server_addr; - StartTestServer(&server_addr); + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); // Set up client. LOG(INFO) << "Connecting to " << server_addr.ToString(); - shared_ptr client_messenger(CreateMessenger("Client")); + shared_ptr client_messenger(CreateMessenger("Client", 1, enable_ssl)); Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@" "{remote=$0, user_credentials=", @@ -119,8 +123,8 @@ TEST_F(TestRpc, TestCall) { } // Test that connecting to an invalid server properly throws an error. -TEST_F(TestRpc, TestCallToBadServer) { - shared_ptr client_messenger(CreateMessenger("Client")); +TEST_P(TestRpc, TestCallToBadServer) { + shared_ptr client_messenger(CreateMessenger("Client", 1, GetParam())); Sockaddr addr; addr.set_port(0); Proxy p(client_messenger, addr, GenericCalculatorService::static_service_name()); @@ -135,14 +139,15 @@ TEST_F(TestRpc, TestCallToBadServer) { } // Test that RPC calls can be failed with an error status on the server. -TEST_F(TestRpc, TestInvalidMethodCall) { +TEST_P(TestRpc, TestInvalidMethodCall) { // Set up server. Sockaddr server_addr; - StartTestServer(&server_addr); + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); // Set up client. LOG(INFO) << "Connecting to " << server_addr.ToString(); - shared_ptr client_messenger(CreateMessenger("Client")); + shared_ptr client_messenger(CreateMessenger("Client", 1, enable_ssl)); Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); // Call the method which fails. @@ -153,13 +158,14 @@ TEST_F(TestRpc, TestInvalidMethodCall) { // Test that the error message returned when connecting to the wrong service // is reasonable. -TEST_F(TestRpc, TestWrongService) { +TEST_P(TestRpc, TestWrongService) { // Set up server. Sockaddr server_addr; - StartTestServer(&server_addr); + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); // Set up client with the wrong service name. - shared_ptr client_messenger(CreateMessenger("Client")); + shared_ptr client_messenger(CreateMessenger("Client", 1, enable_ssl)); Proxy p(client_messenger, server_addr, "WrongServiceName"); // Call the method which fails. @@ -180,7 +186,7 @@ int GetOpenFileLimit() { // Test that we can still make RPC connections even if many fds are in use. // This is a regression test for KUDU-650. -TEST_F(TestRpc, TestHighFDs) { +TEST_P(TestRpc, TestHighFDs) { // This test can only run if ulimit is set high. const int kNumFakeFiles = 3500; const int kMinUlimit = kNumFakeFiles + 100; @@ -200,14 +206,15 @@ TEST_F(TestRpc, TestHighFDs) { // Set up server and client, and verify we can make a successful call. Sockaddr server_addr; - StartTestServer(&server_addr); - shared_ptr client_messenger(CreateMessenger("Client")); + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + shared_ptr client_messenger(CreateMessenger("Client", 1, enable_ssl)); Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); } // Test that connections are kept alive between calls. -TEST_F(TestRpc, TestConnectionKeepalive) { +TEST_P(TestRpc, TestConnectionKeepalive) { // Only run one reactor per messenger, so we can grab the metrics from that // one without having to check all. n_server_reactor_threads_ = 1; @@ -215,11 +222,12 @@ TEST_F(TestRpc, TestConnectionKeepalive) { // Set up server. Sockaddr server_addr; - StartTestServer(&server_addr); + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); // Set up client. LOG(INFO) << "Connecting to " << server_addr.ToString(); - shared_ptr client_messenger(CreateMessenger("Client")); + shared_ptr client_messenger(CreateMessenger("Client", 1, enable_ssl)); Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); @@ -251,16 +259,17 @@ TEST_F(TestRpc, TestConnectionKeepalive) { // Test that a call which takes longer than the keepalive time // succeeds -- i.e that we don't consider a connection to be "idle" on the // server if there is a call outstanding on it. -TEST_F(TestRpc, TestCallLongerThanKeepalive) { +TEST_P(TestRpc, TestCallLongerThanKeepalive) { // set very short keepalive keepalive_time_ms_ = 50; // Set up server. Sockaddr server_addr; - StartTestServer(&server_addr); + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); // Set up client. - shared_ptr client_messenger(CreateMessenger("Client")); + shared_ptr client_messenger(CreateMessenger("Client", 1, enable_ssl)); Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); // Make a call which sleeps longer than the keepalive. @@ -274,13 +283,14 @@ TEST_F(TestRpc, TestCallLongerThanKeepalive) { } // Test that the RpcSidecar transfers the expected messages. -TEST_F(TestRpc, TestRpcSidecar) { +TEST_P(TestRpc, TestRpcSidecar) { // Set up server. Sockaddr server_addr; - StartTestServer(&server_addr); + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); // Set up client. - shared_ptr client_messenger(CreateMessenger("Client")); + shared_ptr client_messenger(CreateMessenger("Client", 1, GetParam())); Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); // Test some small sidecars @@ -292,10 +302,11 @@ TEST_F(TestRpc, TestRpcSidecar) { } // Test that timeouts are properly handled. -TEST_F(TestRpc, TestCallTimeout) { +TEST_P(TestRpc, TestCallTimeout) { Sockaddr server_addr; - StartTestServer(&server_addr); - shared_ptr client_messenger(CreateMessenger("Client")); + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + shared_ptr client_messenger(CreateMessenger("Client", 1, enable_ssl)); Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); // Test a very short timeout - we expect this will time out while the @@ -319,10 +330,11 @@ TEST_F(TestRpc, TestCallTimeout) { // This is a regression test against prior behavior where the connection negotiation // was assigned the timeout of the first call on that connection. So, if the first // call had a short timeout, the later call would also inherit the timed-out negotiation. -TEST_F(TestRpc, TestCallTimeoutDoesntAffectNegotiation) { +TEST_P(TestRpc, TestCallTimeoutDoesntAffectNegotiation) { Sockaddr server_addr; - StartTestServer(&server_addr); - shared_ptr client_messenger(CreateMessenger("Client")); + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + shared_ptr client_messenger(CreateMessenger("Client", 1, enable_ssl)); Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); FLAGS_rpc_negotiation_inject_delay_ms = 500; @@ -457,16 +469,17 @@ TEST_F(TestRpc, TestServerShutsDown) { } // Test handler latency metric. -TEST_F(TestRpc, TestRpcHandlerLatencyMetric) { +TEST_P(TestRpc, TestRpcHandlerLatencyMetric) { const uint64_t sleep_micros = 20 * 1000; // Set up server. Sockaddr server_addr; - StartTestServerWithGeneratedCode(&server_addr); + bool enable_ssl = GetParam(); + StartTestServerWithGeneratedCode(&server_addr, enable_ssl); // Set up client. - shared_ptr client_messenger(CreateMessenger("Client")); + shared_ptr client_messenger(CreateMessenger("Client", 1, enable_ssl)); Proxy p(client_messenger, server_addr, CalculatorService::static_service_name()); RpcController controller; @@ -503,8 +516,8 @@ static void DestroyMessengerCallback(shared_ptr* messenger, latch->CountDown(); } -TEST_F(TestRpc, TestRpcCallbackDestroysMessenger) { - shared_ptr client_messenger(CreateMessenger("Client")); +TEST_P(TestRpc, TestRpcCallbackDestroysMessenger) { + shared_ptr client_messenger(CreateMessenger("Client", 1, GetParam())); Sockaddr bad_addr; CountDownLatch latch(1); @@ -524,15 +537,16 @@ TEST_F(TestRpc, TestRpcCallbackDestroysMessenger) { // Test that setting the client timeout / deadline gets propagated to RPC // services. -TEST_F(TestRpc, TestRpcContextClientDeadline) { +TEST_P(TestRpc, TestRpcContextClientDeadline) { const uint64_t sleep_micros = 20 * 1000; // Set up server. Sockaddr server_addr; - StartTestServerWithGeneratedCode(&server_addr); + bool enable_ssl = GetParam(); + StartTestServerWithGeneratedCode(&server_addr, enable_ssl); // Set up client. - shared_ptr client_messenger(CreateMessenger("Client")); + shared_ptr client_messenger(CreateMessenger("Client", 1, enable_ssl)); Proxy p(client_messenger, server_addr, CalculatorService::static_service_name()); SleepRequestPB req; @@ -551,13 +565,14 @@ TEST_F(TestRpc, TestRpcContextClientDeadline) { // Test that setting an call-level application feature flag to an unknown value // will make the server reject the call. -TEST_F(TestRpc, TestApplicationFeatureFlag) { +TEST_P(TestRpc, TestApplicationFeatureFlag) { // Set up server. Sockaddr server_addr; - StartTestServerWithGeneratedCode(&server_addr); + bool enable_ssl = GetParam(); + StartTestServerWithGeneratedCode(&server_addr, enable_ssl); // Set up client. - shared_ptr client_messenger(CreateMessenger("Client")); + shared_ptr client_messenger(CreateMessenger("Client", 1, enable_ssl)); Proxy p(client_messenger, server_addr, CalculatorService::static_service_name()); { // Supported flag @@ -587,17 +602,18 @@ TEST_F(TestRpc, TestApplicationFeatureFlag) { } } -TEST_F(TestRpc, TestApplicationFeatureFlagUnsupportedServer) { +TEST_P(TestRpc, TestApplicationFeatureFlagUnsupportedServer) { auto savedFlags = kSupportedServerRpcFeatureFlags; auto cleanup = MakeScopedCleanup([&] () { kSupportedServerRpcFeatureFlags = savedFlags; }); kSupportedServerRpcFeatureFlags = {}; // Set up server. Sockaddr server_addr; - StartTestServerWithGeneratedCode(&server_addr); + bool enable_ssl = GetParam(); + StartTestServerWithGeneratedCode(&server_addr, enable_ssl); // Set up client. - shared_ptr client_messenger(CreateMessenger("Client")); + shared_ptr client_messenger(CreateMessenger("Client", 1, enable_ssl)); Proxy p(client_messenger, server_addr, CalculatorService::static_service_name()); { // Required flag http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/sasl_client.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/sasl_client.cc b/src/kudu/rpc/sasl_client.cc index 53c5e4f..99cb822 100644 --- a/src/kudu/rpc/sasl_client.cc +++ b/src/kudu/rpc/sasl_client.cc @@ -83,9 +83,9 @@ static Status StatusFromRpcError(const ErrorStatusPB& error) { } } -SaslClient::SaslClient(string app_name, int fd) +SaslClient::SaslClient(string app_name, Socket* socket) : app_name_(std::move(app_name)), - sock_(fd), + sock_(socket), helper_(SaslHelper::CLIENT), client_state_(SaslNegotiationState::NEW), negotiated_mech_(SaslMechanism::INVALID), @@ -99,10 +99,6 @@ SaslClient::SaslClient(string app_name, int fd) callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr)); } -SaslClient::~SaslClient() { - sock_.Release(); // Do not close the underlying socket when this object is destroyed. -} - Status SaslClient::EnableAnonymous() { DCHECK_EQ(client_state_, SaslNegotiationState::NEW); return helper_.EnableAnonymous(); @@ -199,7 +195,7 @@ Status SaslClient::Negotiate() { } // Ensure we can use blocking calls on the socket during negotiation. - RETURN_NOT_OK(EnsureBlockingMode(&sock_)); + RETURN_NOT_OK(EnsureBlockingMode(sock_)); // Start by asking the server for a list of available auth mechanisms. RETURN_NOT_OK(SendNegotiateMessage()); @@ -213,7 +209,7 @@ Status SaslClient::Negotiate() { while (!nego_ok_ || nego_response_expected_) { ResponseHeader header; Slice param_buf; - RETURN_NOT_OK(ReceiveFramedMessageBlocking(&sock_, &recv_buf, &header, ¶m_buf, deadline_)); + RETURN_NOT_OK(ReceiveFramedMessageBlocking(sock_, &recv_buf, &header, ¶m_buf, deadline_)); nego_response_expected_ = false; SaslMessagePB response; @@ -257,7 +253,7 @@ Status SaslClient::SendSaslMessage(const SaslMessagePB& msg) { // Create header with SASL-specific callId RequestHeader header; header.set_call_id(kSaslCallId); - return helper_.SendSaslMessage(&sock_, header, msg, deadline_); + return helper_.SendSaslMessage(sock_, header, msg, deadline_); } Status SaslClient::ParseSaslMsgResponse(const ResponseHeader& header, const Slice& param_buf, http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/sasl_client.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/sasl_client.h b/src/kudu/rpc/sasl_client.h index e9a9386..f8478c3 100644 --- a/src/kudu/rpc/sasl_client.h +++ b/src/kudu/rpc/sasl_client.h @@ -46,8 +46,7 @@ class SaslMessagePB_SaslAuth; class SaslClient { public: // Does not take ownership of the socket indicated by the fd. - SaslClient(string app_name, int fd); - ~SaslClient(); + SaslClient(string app_name, Socket* socket); // Enable ANONYMOUS authentication. // Must be called after Init(). @@ -149,7 +148,7 @@ class SaslClient { Status ParseError(const Slice& err_data); string app_name_; - Socket sock_; + Socket* sock_; std::vector callbacks_; // The SASL connection object. This is initialized in Init() and // freed after Negotiate() completes (regardless whether it was successful). http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/sasl_rpc-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/sasl_rpc-test.cc b/src/kudu/rpc/sasl_rpc-test.cc index 2786d89..5861f7b 100644 --- a/src/kudu/rpc/sasl_rpc-test.cc +++ b/src/kudu/rpc/sasl_rpc-test.cc @@ -55,10 +55,10 @@ class TestSaslRpc : public RpcTestBase { // Test basic initialization of the objects. TEST_F(TestSaslRpc, TestBasicInit) { - SaslServer server(kSaslAppName, -1); + SaslServer server(kSaslAppName, nullptr); server.EnableAnonymous(); ASSERT_OK(server.Init(kSaslAppName)); - SaslClient client(kSaslAppName, -1); + SaslClient client(kSaslAppName, nullptr); client.EnableAnonymous(); ASSERT_OK(client.Init(kSaslAppName)); } @@ -107,14 +107,14 @@ static void RunNegotiationTest(const SocketCallable& server_runner, //////////////////////////////////////////////////////////////////////////////// static void RunAnonNegotiationServer(Socket* conn) { - SaslServer sasl_server(kSaslAppName, conn->GetFd()); + SaslServer sasl_server(kSaslAppName, conn); CHECK_OK(sasl_server.EnableAnonymous()); CHECK_OK(sasl_server.Init(kSaslAppName)); CHECK_OK(sasl_server.Negotiate()); } static void RunAnonNegotiationClient(Socket* conn) { - SaslClient sasl_client(kSaslAppName, conn->GetFd()); + SaslClient sasl_client(kSaslAppName, conn); CHECK_OK(sasl_client.EnableAnonymous()); CHECK_OK(sasl_client.Init(kSaslAppName)); CHECK_OK(sasl_client.Negotiate()); @@ -128,7 +128,7 @@ TEST_F(TestSaslRpc, TestAnonNegotiation) { //////////////////////////////////////////////////////////////////////////////// static void RunPlainNegotiationServer(Socket* conn) { - SaslServer sasl_server(kSaslAppName, conn->GetFd()); + SaslServer sasl_server(kSaslAppName, conn); CHECK_OK(sasl_server.EnablePlain()); CHECK_OK(sasl_server.Init(kSaslAppName)); CHECK_OK(sasl_server.Negotiate()); @@ -137,7 +137,7 @@ static void RunPlainNegotiationServer(Socket* conn) { } static void RunPlainNegotiationClient(Socket* conn) { - SaslClient sasl_client(kSaslAppName, conn->GetFd()); + SaslClient sasl_client(kSaslAppName, conn); CHECK_OK(sasl_client.EnablePlain("my-username", "ignored password")); CHECK_OK(sasl_client.Init(kSaslAppName)); CHECK_OK(sasl_client.Negotiate()); @@ -160,7 +160,7 @@ using CheckerFunction = std::function; static void RunGSSAPINegotiationServer( Socket* conn, const CheckerFunction& post_check) { - SaslServer sasl_server(kSaslAppName, conn->GetFd()); + SaslServer sasl_server(kSaslAppName, conn); sasl_server.set_server_fqdn("127.0.0.1"); CHECK_OK(sasl_server.EnableGSSAPI()); CHECK_OK(sasl_server.Init(kSaslAppName)); @@ -172,7 +172,7 @@ static void RunGSSAPINegotiationServer( static void RunGSSAPINegotiationClient( Socket* conn, const CheckerFunction& post_check) { - SaslClient sasl_client(kSaslAppName, conn->GetFd()); + SaslClient sasl_client(kSaslAppName, conn); sasl_client.set_server_fqdn("127.0.0.1"); CHECK_OK(sasl_client.EnableGSSAPI()); CHECK_OK(sasl_client.Init(kSaslAppName)); @@ -206,7 +206,7 @@ TEST_F(TestSaslRpc, TestRestrictiveServer_NonRestrictiveClient) { CHECK_EQ("testuser", server.authenticated_user()); }), [](Socket* conn) { - SaslClient sasl_client(kSaslAppName, conn->GetFd()); + SaslClient sasl_client(kSaslAppName, conn); sasl_client.set_server_fqdn("127.0.0.1"); // The client enables both PLAIN and GSSAPI. CHECK_OK(sasl_client.EnablePlain("foo", "bar")); @@ -241,7 +241,7 @@ TEST_F(TestSaslRpc, TestNoMatchingMechanisms) { ASSERT_STR_CONTAINS(s.ToString(), "got EOF from remote"); }), [](Socket* conn) { - SaslClient sasl_client(kSaslAppName, conn->GetFd()); + SaslClient sasl_client(kSaslAppName, conn); sasl_client.set_server_fqdn("127.0.0.1"); // The client enables both PLAIN and GSSAPI. CHECK_OK(sasl_client.EnablePlain("foo", "bar")); @@ -373,7 +373,7 @@ TEST_F(TestSaslRpc, TestPreflight) { //////////////////////////////////////////////////////////////////////////////// static void RunTimeoutExpectingServer(Socket* conn) { - SaslServer sasl_server(kSaslAppName, conn->GetFd()); + SaslServer sasl_server(kSaslAppName, conn); CHECK_OK(sasl_server.EnableAnonymous()); CHECK_OK(sasl_server.Init(kSaslAppName)); Status s = sasl_server.Negotiate(); @@ -382,7 +382,7 @@ static void RunTimeoutExpectingServer(Socket* conn) { } static void RunTimeoutNegotiationClient(Socket* sock) { - SaslClient sasl_client(kSaslAppName, sock->GetFd()); + SaslClient sasl_client(kSaslAppName, sock); CHECK_OK(sasl_client.EnableAnonymous()); CHECK_OK(sasl_client.Init(kSaslAppName)); MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L); @@ -400,7 +400,7 @@ TEST_F(TestSaslRpc, TestClientTimeout) { //////////////////////////////////////////////////////////////////////////////// static void RunTimeoutNegotiationServer(Socket* sock) { - SaslServer sasl_server(kSaslAppName, sock->GetFd()); + SaslServer sasl_server(kSaslAppName, sock); CHECK_OK(sasl_server.EnableAnonymous()); CHECK_OK(sasl_server.Init(kSaslAppName)); MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L); @@ -411,7 +411,7 @@ static void RunTimeoutNegotiationServer(Socket* sock) { } static void RunTimeoutExpectingClient(Socket* conn) { - SaslClient sasl_client(kSaslAppName, conn->GetFd()); + SaslClient sasl_client(kSaslAppName, conn); CHECK_OK(sasl_client.EnableAnonymous()); CHECK_OK(sasl_client.Init(kSaslAppName)); Status s = sasl_client.Negotiate(); http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/sasl_server.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/sasl_server.cc b/src/kudu/rpc/sasl_server.cc index 9a6523e..f830dae 100644 --- a/src/kudu/rpc/sasl_server.cc +++ b/src/kudu/rpc/sasl_server.cc @@ -51,9 +51,9 @@ static int SaslServerPlainAuthCb(sasl_conn_t *conn, void *sasl_server, const cha ->PlainAuthCb(conn, user, pass, passlen, propctx); } -SaslServer::SaslServer(string app_name, int fd) +SaslServer::SaslServer(string app_name, Socket* socket) : app_name_(std::move(app_name)), - sock_(fd), + sock_(socket), helper_(SaslHelper::SERVER), server_state_(SaslNegotiationState::NEW), negotiated_mech_(SaslMechanism::INVALID), @@ -65,10 +65,6 @@ SaslServer::SaslServer(string app_name, int fd) callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr)); } -SaslServer::~SaslServer() { - sock_.Release(); // Do not close the underlying socket when this object is destroyed. -} - Status SaslServer::EnableAnonymous() { DCHECK_EQ(server_state_, SaslNegotiationState::NEW); return helper_.EnableAnonymous(); @@ -173,7 +169,7 @@ Status SaslServer::Negotiate() { } // Ensure we can use blocking calls on the socket during negotiation. - RETURN_NOT_OK(EnsureBlockingMode(&sock_)); + RETURN_NOT_OK(EnsureBlockingMode(sock_)); faststring recv_buf; @@ -185,7 +181,7 @@ Status SaslServer::Negotiate() { TRACE("Waiting for next SASL message..."); RequestHeader header; Slice param_buf; - RETURN_NOT_OK(ReceiveFramedMessageBlocking(&sock_, &recv_buf, &header, ¶m_buf, deadline_)); + RETURN_NOT_OK(ReceiveFramedMessageBlocking(sock_, &recv_buf, &header, ¶m_buf, deadline_)); SaslMessagePB request; RETURN_NOT_OK(ParseSaslMsgRequest(header, param_buf, &request)); @@ -235,7 +231,7 @@ Status SaslServer::ValidateConnectionHeader(faststring* recv_buf) { size_t num_read; const size_t conn_header_len = kMagicNumberLength + kHeaderFlagsLength; recv_buf->resize(conn_header_len); - RETURN_NOT_OK(sock_.BlockingRecv(recv_buf->data(), conn_header_len, &num_read, deadline_)); + RETURN_NOT_OK(sock_->BlockingRecv(recv_buf->data(), conn_header_len, &num_read, deadline_)); DCHECK_EQ(conn_header_len, num_read); RETURN_NOT_OK(serialization::ValidateConnHeader(*recv_buf)); @@ -268,7 +264,7 @@ Status SaslServer::SendSaslMessage(const SaslMessagePB& msg) { // Create header with SASL-specific callId ResponseHeader header; header.set_call_id(kSaslCallId); - return helper_.SendSaslMessage(&sock_, header, msg, deadline_); + return helper_.SendSaslMessage(sock_, header, msg, deadline_); } Status SaslServer::SendSaslError(ErrorStatusPB::RpcErrorCodePB code, const Status& err) { @@ -290,7 +286,7 @@ Status SaslServer::SendSaslError(ErrorStatusPB::RpcErrorCodePB code, const Statu msg.set_code(code); msg.set_message(err.ToString()); - RETURN_NOT_OK(helper_.SendSaslMessage(&sock_, header, msg, deadline_)); + RETURN_NOT_OK(helper_.SendSaslMessage(sock_, header, msg, deadline_)); TRACE("Sent SASL error: $0", ErrorStatusPB::RpcErrorCodePB_Name(code)); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/rpc/sasl_server.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/sasl_server.h b/src/kudu/rpc/sasl_server.h index a05725c..f26f229 100644 --- a/src/kudu/rpc/sasl_server.h +++ b/src/kudu/rpc/sasl_server.h @@ -43,9 +43,8 @@ using std::string; // Operations on this class are NOT thread-safe. class SaslServer { public: - // Does not take ownership of the socket indicated by the fd. - SaslServer(string app_name, int fd); - ~SaslServer(); + // Does not take ownership of 'socket'. + SaslServer(string app_name, Socket* socket); // Enable ANONYMOUS authentication. // Must be called after Init(). @@ -150,7 +149,7 @@ class SaslServer { Status HandleResponseRequest(const SaslMessagePB& request); string app_name_; - Socket sock_; + Socket* sock_; std::vector callbacks_; // The SASL connection object. This is initialized in Init() and // freed after Negotiate() completes (regardless whether it was successful). http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt index 64847fe..667942d 100644 --- a/src/kudu/util/CMakeLists.txt +++ b/src/kudu/util/CMakeLists.txt @@ -101,6 +101,12 @@ else () set(SEMAPHORE_CC "semaphore.cc") endif() +# Fall back to using the ported functionality if we're using an older version of +# OpenSSL +if (${OPENSSL_VERSION} VERSION_LESS "1.0.2") + set(PORTED_X509_CHECK_HOST_CC "x509_check_host.cc") +endif() + set(UTIL_SRCS atomic.cc bitmap.cc @@ -146,12 +152,15 @@ set(UTIL_SRCS net/net_util.cc net/sockaddr.cc net/socket.cc + net/ssl_factory.cc + net/ssl_socket.cc oid_generator.cc once.cc os-util.cc path_util.cc pb_util.cc pb_util-internal.cc + ${PORTED_X509_CHECK_HOST_CC} random_util.cc resettable_heartbeater.cc rolling_log.cc @@ -200,6 +209,8 @@ set(UTIL_LIBS histogram_proto libev maintenance_manager_proto + openssl_crypto + openssl_ssl pb_util_proto protobuf version_info_proto http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/util/net/net_util-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/net/net_util-test.cc b/src/kudu/util/net/net_util-test.cc index b1c33ef..e7bb6c0 100644 --- a/src/kudu/util/net/net_util-test.cc +++ b/src/kudu/util/net/net_util-test.cc @@ -26,6 +26,7 @@ #include "kudu/util/net/net_util.h" #include "kudu/util/net/socket.h" #include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/ssl_socket.h" #include "kudu/util/status.h" #include "kudu/util/test_util.h" http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/util/net/socket.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/net/socket.cc b/src/kudu/util/net/socket.cc index 5d68a51..82b6601 100644 --- a/src/kudu/util/net/socket.cc +++ b/src/kudu/util/net/socket.cc @@ -169,6 +169,19 @@ Status Socket::SetNoDelay(bool enabled) { return Status::OK(); } +Status Socket::SetTcpCork(bool enabled) { +#if defined(__linux__) + int flag = enabled ? 1 : 0; + if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) == -1) { + int err = errno; + return Status::NetworkError(std::string("failed to set TCP_CORK: ") + + ErrnoToString(err), Slice(), err); + } +#endif // defined(__linux__) + // TODO: Use TCP_NOPUSH for OSX if perf becomes an issue. + return Status::OK(); +} + Status Socket::SetNonBlocking(bool enabled) { int curflags = ::fcntl(fd_, F_GETFL, 0); if (curflags == -1) { http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/util/net/socket.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/net/socket.h b/src/kudu/util/net/socket.h index 99af6c6..b3d7eea 100644 --- a/src/kudu/util/net/socket.h +++ b/src/kudu/util/net/socket.h @@ -40,10 +40,10 @@ class Socket { explicit Socket(int fd); // Close the socket. Errors will be ignored. - ~Socket(); + virtual ~Socket(); // Close the Socket, checking for errors. - Status Close(); + virtual Status Close(); // call shutdown() on the socket Status Shutdown(bool shut_read, bool shut_write); @@ -67,6 +67,9 @@ class Socket { // Set or clear TCP_NODELAY Status SetNoDelay(bool enabled); + // Set or clear TCP_CORK + Status SetTcpCork(bool enabled); + // Set or clear O_NONBLOCK Status SetNonBlocking(bool enabled); Status IsNonBlocking(bool* is_nonblock) const; @@ -110,19 +113,19 @@ class Socket { // get the error status using getsockopt(2) Status GetSockError() const; - Status Write(const uint8_t *buf, int32_t amt, int32_t *nwritten); + virtual Status Write(const uint8_t *buf, int32_t amt, int32_t *nwritten); - Status Writev(const struct ::iovec *iov, int iov_len, int32_t *nwritten); + virtual Status Writev(const struct ::iovec *iov, int iov_len, int32_t *nwritten); // Blocking Write call, returns IOError unless full buffer is sent. // Underlying Socket expected to be in blocking mode. Fails if any Write() sends 0 bytes. // Returns OK if buflen bytes were sent, otherwise IOError. - // Upon return, num_written will contain the number of bytes actually written. + // Upon return, nwritten will contain the number of bytes actually written. // See also writen() from Stevens (2004) or Kerrisk (2010) - Status BlockingWrite(const uint8_t *buf, size_t buflen, size_t *num_written, + Status BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten, const MonoTime& deadline); - Status Recv(uint8_t *buf, int32_t amt, int32_t *nread); + virtual Status Recv(uint8_t *buf, int32_t amt, int32_t *nread); // Blocking Recv call, returns IOError unless requested amt bytes are read. // Underlying Socket expected to be in blocking mode. Fails if any Recv() reads 0 bytes. http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/util/net/ssl_factory.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/net/ssl_factory.cc b/src/kudu/util/net/ssl_factory.cc new file mode 100644 index 0000000..68aa963 --- /dev/null +++ b/src/kudu/util/net/ssl_factory.cc @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include +#include +#include +#include + +#include "kudu/util/debug/leakcheck_disabler.h" +#include "kudu/util/mutex.h" +#include "kudu/util/thread.h" +#include "kudu/util/net/ssl_factory.h" +#include "kudu/util/net/ssl_socket.h" + +namespace kudu { + +// These non-POD elements will be alive for the lifetime of the process, so don't allocate in +// static storage. +static std::vector ssl_mutexes; + +// Lock/Unlock the nth lock. Only to be used by OpenSSL. +static void CryptoLockingCallback(int mode, int n, const char* /*unused*/, int /*unused*/) { + if (mode & CRYPTO_LOCK) { + ssl_mutexes[n]->Acquire(); + } else { + ssl_mutexes[n]->Release(); + } +} + +// Return the current pthread's tid. Only to be used by OpenSSL. +static void CryptoThreadIDCallback(CRYPTO_THREADID* id) { + return CRYPTO_THREADID_set_numeric(id, Thread::UniqueThreadId()); +} + +void DoSSLInit() { + SSL_library_init(); + SSL_load_error_strings(); + OpenSSL_add_all_algorithms(); + RAND_poll(); + + for (int i = 0; i < CRYPTO_num_locks(); ++i) { + debug::ScopedLeakCheckDisabler d; + ssl_mutexes.push_back(new Mutex()); + } + + // Callbacks used by OpenSSL required in a multi-threaded setting. + CRYPTO_set_locking_callback(CryptoLockingCallback); + CRYPTO_THREADID_set_callback(CryptoThreadIDCallback); +} + +SSLFactory::SSLFactory() : ctx_(nullptr, SSL_CTX_free) { + static std::once_flag ssl_once; + std::call_once(ssl_once, DoSSLInit); +} + +SSLFactory::~SSLFactory() { +} + +Status SSLFactory::Init() { + CHECK(!ctx_.get()); + ctx_.reset(SSL_CTX_new(TLSv1_2_method())); + if (ctx_ == nullptr) { + return Status::RuntimeError("Could not create SSL context"); + } + SSL_CTX_set_mode(ctx_.get(), SSL_MODE_AUTO_RETRY); + SSL_CTX_set_options(ctx_.get(), + SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_TLSv1 | SSL_OP_NO_TLSv1_1); + SSL_CTX_set_verify(ctx_.get(), + SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT | SSL_VERIFY_CLIENT_ONCE, nullptr); + return Status::OK(); +} + +std::string SSLFactory::GetLastError(int errno_copy) { + int error_code = ERR_get_error(); + if (error_code == 0) return kudu::ErrnoToString(errno_copy); + const char* error_reason = ERR_reason_error_string(error_code); + if (error_reason != NULL) return error_reason; + return strings::Substitute("SSL error $0", error_code); +} + +Status SSLFactory::LoadCertificate(const std::string& certificate_path) { + ERR_clear_error(); + errno = 0; + if (SSL_CTX_use_certificate_file(ctx_.get(), certificate_path.c_str(), SSL_FILETYPE_PEM) != 1) { + return Status::NotFound( + "Failed to load certificate file '" + certificate_path + "': " + GetLastError(errno)); + } + return Status::OK(); +} + +Status SSLFactory::LoadPrivateKey(const std::string& key_path) { + ERR_clear_error(); + errno = 0; + if (SSL_CTX_use_PrivateKey_file(ctx_.get(), key_path.c_str(), SSL_FILETYPE_PEM) != 1) { + return Status::NotFound( + "Failed to load private key file '" + key_path + "': " + GetLastError(errno)); + } + return Status::OK(); +} + +Status SSLFactory::LoadCertificateAuthority(const std::string& certificate_path) { + ERR_clear_error(); + errno = 0; + if (SSL_CTX_load_verify_locations(ctx_.get(), certificate_path.c_str(), nullptr) != 1) { + return Status::NotFound( + "Failed to load certificate authority file '" + certificate_path + "': " + + GetLastError(errno)); + } + return Status::OK(); +} + +std::unique_ptr SSLFactory::CreateSocket(int socket_fd, bool is_server) { + CHECK(ctx_); + // Create SSL object and transfer ownership to the SSLSocket object created. + SSL* ssl = SSL_new(ctx_.get()); + if (ssl == nullptr) { + return nullptr; + } + std::unique_ptr socket(new SSLSocket(socket_fd, ssl, is_server)); + return socket; + //return new SSLSocket(socket_fd, ssl, is_server); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/util/net/ssl_factory.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/net/ssl_factory.h b/src/kudu/util/net/ssl_factory.h new file mode 100644 index 0000000..8fb0147 --- /dev/null +++ b/src/kudu/util/net/ssl_factory.h @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/errno.h" +#include "kudu/util/status.h" + +struct ssl_ctx_st; +typedef ssl_ctx_st SSL_CTX; + +namespace kudu { + +class Sockaddr; +class SSLSocket; + +class SSLFactory { + public: + SSLFactory(); + + ~SSLFactory(); + + // Set up the SSL_CTX and choose encryption preferences. + Status Init(); + + // Load the server certificate. + Status LoadCertificate(const std::string& certificate_path); + + // Load the private key for the server certificate. + Status LoadPrivateKey(const std::string& key_path); + + // Load the certificate authority. + Status LoadCertificateAuthority(const std::string& certificate_path); + + // Create an SSLSocket wrapped around the file descriptor 'socket_fd'. 'is_server' denotes if it's + // a server socket. The 'socket_fd' is closed when this object is destroyed. + std::unique_ptr CreateSocket(int socket_fd, bool is_server); + + private: + friend class SSLSocket; + std::unique_ptr> ctx_; + + // Gets the last error from the thread local SSL error queue. If no error exists, it returns + // the error corresponding to 'errno_copy'. + static std::string GetLastError(int errno_copy); +}; + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/util/net/ssl_socket.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/net/ssl_socket.cc b/src/kudu/util/net/ssl_socket.cc new file mode 100644 index 0000000..c178ab4 --- /dev/null +++ b/src/kudu/util/net/ssl_socket.cc @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include + +#include +#include +#include + +#include "kudu/gutil/strings/util.h" +#include "kudu/gutil/strings/split.h" +#include "kudu/util/errno.h" +#include "kudu/util/net/ssl_factory.h" +#include "kudu/util/net/ssl_socket.h" +#include "kudu/util/net/sockaddr.h" + +#if OPENSSL_VERSION_NUMBER < 0x10002000L +#include "kudu/util/x509_check_host.h" +#endif // OPENSSL_VERSION_NUMBER + +namespace kudu { + +SSLSocket::SSLSocket(int fd, SSL* ssl, bool is_server) : + Socket(fd), ssl_(ssl), is_server_(is_server) { + SSL_set_fd(ssl_, fd); +} + +SSLSocket::~SSLSocket() { +} + +Status SSLSocket::DoHandshake() { + CHECK(ssl_); + ERR_clear_error(); + errno = 0; + int ret; + if (is_server_) { + ret = SSL_accept(ssl_); + } else { + ret = SSL_connect(ssl_); + } + if (ret <= 0) return Status::NetworkError(SSLFactory::GetLastError(errno)); + + // Verify if the handshake was successful. + int rc = SSL_get_verify_result(ssl_); + if (rc != X509_V_OK) { + return Status::NetworkError("SSL_get_verify_result()", X509_verify_cert_error_string(rc)); + } + + // Get the peer certificate. + std::unique_ptr cert( + SSL_get_peer_certificate(ssl_), [](X509* x) { X509_free(x); }); + cert.reset(SSL_get_peer_certificate(ssl_)); + if (cert == nullptr) { + if (SSL_get_verify_mode(ssl_) & SSL_VERIFY_FAIL_IF_NO_PEER_CERT) { + return Status::NetworkError("Handshake failed: Could not retreive peer certificate"); + } + } + + // Get the peer's hostname + Sockaddr peer_addr; + if (!GetPeerAddress(&peer_addr).ok()) { + return Status::NetworkError("Handshake failed: Could not retrieve peer address"); + } + std::string peer_hostname; + RETURN_NOT_OK(peer_addr.LookupHostname(&peer_hostname)); + + // Check if the hostname matches with either the Common Name or any of the Subject Alternative + // Names of the certificate. + int match; + if ((match = X509_check_host( + cert.get(), peer_hostname.c_str(), peer_hostname.length(), 0, nullptr)) == 0) { + return Status::NetworkError("Handshake failed: Could not verify host with certificate"); + } + if (match < 0) { + return Status::NetworkError("Handshake failed:", SSLFactory::GetLastError(errno)); + } + CHECK(match == 1); + return Status::OK(); +} + +Status SSLSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) { + CHECK(ssl_); + ERR_clear_error(); + errno = 0; + int32_t bytes_written = SSL_write(ssl_, buf, amt); + if (bytes_written <= 0) { + if (SSL_get_error(ssl_, bytes_written) == SSL_ERROR_WANT_WRITE) { + // Socket not ready to write yet. + *nwritten = 0; + return Status::OK(); + } + return Status::NetworkError("SSL_write", SSLFactory::GetLastError(errno)); + } + *nwritten = bytes_written; + return Status::OK(); +} + +Status SSLSocket::Writev(const struct ::iovec *iov, int iov_len, + int32_t *nwritten) { + CHECK(ssl_); + ERR_clear_error(); + int32_t total_written = 0; + // Allows packets to be aggresively be accumulated before sending. + RETURN_NOT_OK(SetTcpCork(1)); + Status write_status = Status::OK(); + for (int i = 0; i < iov_len; ++i) { + int32_t frame_size = iov[i].iov_len; + // Don't return before unsetting TCP_CORK. + write_status = Write(static_cast(iov[i].iov_base), frame_size, nwritten); + total_written += *nwritten; + if (*nwritten < frame_size) break; + } + RETURN_NOT_OK(SetTcpCork(0)); + *nwritten = total_written; + return write_status; +} + +Status SSLSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) { + CHECK(ssl_); + ERR_clear_error(); + errno = 0; + int32_t bytes_read = SSL_read(ssl_, buf, amt); + if (bytes_read <= 0) { + if (bytes_read == 0 && SSL_get_shutdown(ssl_) == SSL_RECEIVED_SHUTDOWN) { + return Status::NetworkError("SSLSocket::Recv() for EOF from remote", Slice(), ESHUTDOWN); + } + if (SSL_get_error(ssl_, bytes_read) == SSL_ERROR_WANT_READ) { + // Nothing available to read yet. + *nread = 0; + return Status::OK(); + } + return Status::NetworkError("SSL_read", SSLFactory::GetLastError(errno)); + } + *nread = bytes_read; + return Status::OK(); +} + +Status SSLSocket::Close() { + CHECK(ssl_); + ERR_clear_error(); + errno = 0; + int32_t ret = SSL_shutdown(ssl_); + Status shutdown_status; + if (ret < 0 && errno != EAGAIN) { + // We still need to close the underlying socket, so don't return just yet. + shutdown_status = Status::NetworkError("SSL_Shutdown", SSLFactory::GetLastError(errno)); + } + SSL_free(ssl_); + ssl_ = nullptr; + ERR_remove_state(0); + + Status close_status = Socket::Close(); + if (!close_status.ok()) return close_status.CloneAndPrepend(shutdown_status.message()); + return shutdown_status; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/bd436125/src/kudu/util/net/ssl_socket.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/net/ssl_socket.h b/src/kudu/util/net/ssl_socket.h new file mode 100644 index 0000000..fd67cfe --- /dev/null +++ b/src/kudu/util/net/ssl_socket.h @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_NET_SSL_SOCKET_H +#define KUDU_UTIL_NET_SSL_SOCKET_H + +#include + +#include + +#include "kudu/gutil/macros.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/status.h" + +struct ssl_st; +typedef ssl_st SSL; + +namespace kudu { + +class Sockaddr; + +class SSLSocket : public Socket { + public: + SSLSocket(int fd, SSL* ssl, bool is_server); + + ~SSLSocket(); + + // Do the SSL handshake as a client or a server and verify that the credentials were correctly + // verified. + Status DoHandshake(); + + Status Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) override; + + Status Writev(const struct ::iovec *iov, int iov_len, int32_t *nwritten) override; + + Status Recv(uint8_t *buf, int32_t amt, int32_t *nread) override; + + // Shutdown the connection and free the SSL state for this connection. + Status Close() override; + private: + SSL* ssl_; // Owned. + bool is_server_; +}; + +} // namespace kudu + +#endif