From commits-return-83977-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Tue Mar 12 12:45:05 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 32952180787 for ; Tue, 12 Mar 2019 13:45:00 +0100 (CET) Received: (qmail 15439 invoked by uid 500); 12 Mar 2019 12:44:53 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 12210 invoked by uid 99); 12 Mar 2019 12:44:51 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Mar 2019 12:44:51 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 2C57A879D7; Tue, 12 Mar 2019 12:44:50 +0000 (UTC) Date: Tue, 12 Mar 2019 12:45:36 +0000 To: "commits@hbase.apache.org" Subject: [hbase] 48/133: HBASE-17051 [C++] implement RPC client and connection management (Xiaobing Zhou) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: zghao@apache.org In-Reply-To: <155239468804.28129.10081236749289850749@gitbox.apache.org> References: <155239468804.28129.10081236749289850749@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: hbase X-Git-Refname: refs/heads/HBASE-14850 X-Git-Reftype: branch X-Git-Rev: dd23eae3b2bc678c8015d3c307bded286d1cf4f0 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190312124450.2C57A879D7@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. zghao pushed a commit to branch HBASE-14850 in repository https://gitbox.apache.org/repos/asf/hbase.git commit dd23eae3b2bc678c8015d3c307bded286d1cf4f0 Author: Enis Soztutar AuthorDate: Mon Dec 19 18:38:51 2016 -0800 HBASE-17051 [C++] implement RPC client and connection management (Xiaobing Zhou) --- hbase-native-client/Makefile | 2 +- hbase-native-client/connection/BUCK | 3 + hbase-native-client/connection/connection-id.h | 88 +++++++++++++++ .../connection/connection-pool-test.cc | 35 +++--- hbase-native-client/connection/connection-pool.cc | 47 ++++---- hbase-native-client/connection/connection-pool.h | 50 ++++----- hbase-native-client/connection/rpc-client.cc | 119 +++++++++++++++++++++ hbase-native-client/connection/rpc-client.h | 116 ++++++++++++++++++++ hbase-native-client/connection/rpc-connection.h | 58 ++++++++++ hbase-native-client/core/location-cache.cc | 17 ++- hbase-native-client/{connection => security}/BUCK | 28 +---- hbase-native-client/security/user.h | 39 +++++++ 12 files changed, 504 insertions(+), 98 deletions(-) diff --git a/hbase-native-client/Makefile b/hbase-native-client/Makefile index be5d461..96c275e 100644 --- a/hbase-native-client/Makefile +++ b/hbase-native-client/Makefile @@ -22,7 +22,7 @@ LD:=g++ DEBUG_PATH = build/debug RELEASE_PATH = build/release PROTO_SRC_DIR = build/if -MODULES = connection core serde test-util utils +MODULES = connection core serde test-util utils security SRC_DIR = $(MODULES) DEBUG_BUILD_DIR = $(addprefix $(DEBUG_PATH)/,$(MODULES)) RELEASE_BUILD_DIR = $(addprefix $(RELEASE_PATH)/,$(MODULES)) diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index f093d5a..c22cc89 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -24,8 +24,10 @@ cxx_library( "client-handler.h", "connection-factory.h", "connection-pool.h", + "connection-id.h", "pipeline.h", "request.h", + "rpc-connection.h", "response.h", "service.h", ], @@ -41,6 +43,7 @@ cxx_library( "//if:if", "//utils:utils", "//serde:serde", + "//security:security", "//third-party:folly", "//third-party:wangle", ], diff --git a/hbase-native-client/connection/connection-id.h b/hbase-native-client/connection/connection-id.h new file mode 100644 index 0000000..62fe222 --- /dev/null +++ b/hbase-native-client/connection/connection-id.h @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include "if/HBase.pb.h" +#include "security/user.h" + +#include +#include +#include + +using hbase::pb::ServerName; +using hbase::security::User; + +namespace hbase { +class ConnectionId { + public: + ConnectionId(const std::string &host, uint16_t port) + : ConnectionId(host, port, User::defaultUser(), "") {} + + ConnectionId(const std::string &host, uint16_t port, + std::shared_ptr user) + : ConnectionId(host, port, user, "") {} + + ConnectionId(const std::string &host, uint16_t port, + std::shared_ptr user, const std::string &service_name) + : user_(user), service_name_(service_name), host_(host), port_(port) {} + + virtual ~ConnectionId() = default; + + std::shared_ptr user() const { return user_; } + std::string service_name() const { return service_name_; } + std::string host() { return host_; } + uint16_t port() { return port_; } + + private: + std::shared_ptr user_; + std::string service_name_; + std::string host_; + uint16_t port_; +}; + +/* Equals function for ConnectionId */ +struct ConnectionIdEquals { + /** equals */ + bool operator()(const std::shared_ptr &lhs, + const std::shared_ptr &rhs) const { + return userEquals(lhs->user(), rhs->user()) && lhs->host() == rhs->host() && + lhs->port() == rhs->port(); + } + + private: + bool userEquals(const std::shared_ptr &lhs, + const std::shared_ptr &rhs) const { + return lhs == nullptr ? rhs == nullptr + : (rhs == nullptr ? false : lhs->user_name() == + rhs->user_name()); + } +}; + +/** Hash for ConnectionId. */ +struct ConnectionIdHash { + /** hash */ + std::size_t operator()(const std::shared_ptr &ci) const { + std::size_t h = 0; + boost::hash_combine(h, ci->user() == nullptr ? 0 : ci->user()->user_name()); + boost::hash_combine(h, ci->host()); + boost::hash_combine(h, ci->port()); + return h; + } +}; +} // namespace hbase diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc index e17c16c..4547b30 100644 --- a/hbase-native-client/connection/connection-pool-test.cc +++ b/hbase-native-client/connection/connection-pool-test.cc @@ -17,20 +17,22 @@ * */ +#include "connection/connection-id.h" #include "connection/connection-pool.h" - -#include -#include - #include "connection/connection-factory.h" + #include "if/HBase.pb.h" #include "serde/server-name.h" +#include +#include + using namespace hbase; using hbase::pb::ServerName; using ::testing::Return; using ::testing::_; +using hbase::ConnectionId; class MockConnectionFactory : public ConnectionFactory { public: @@ -75,13 +77,10 @@ TEST(TestConnectionPool, TestOnlyCreateOnce) { .WillRepeatedly(Return(mock_boot)); ConnectionPool cp{mock_cf}; - ServerName sn; - sn.set_host_name(hostname); - sn.set_port(port); - - auto result = cp.Get(sn); + auto remote_id = std::make_shared(hostname, port); + auto result = cp.GetConnection(remote_id); ASSERT_TRUE(result != nullptr); - result = cp.Get(sn); + result = cp.GetConnection(remote_id); } TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) { @@ -102,13 +101,13 @@ TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) { ConnectionPool cp{mock_cf}; { - auto result_one = cp.Get(folly::to( - hostname_one + ":" + folly::to(port))); - auto result_two = cp.Get(folly::to( - hostname_two + ":" + folly::to(port))); + auto remote_id = std::make_shared(hostname_one, port); + auto result_one = cp.GetConnection(remote_id); + auto remote_id2 = std::make_shared(hostname_two, port); + auto result_two = cp.GetConnection(remote_id2); } - auto result_one = cp.Get( - folly::to(hostname_one + ":" + folly::to(port))); - auto result_two = cp.Get( - folly::to(hostname_two + ":" + folly::to(port))); + auto remote_id = std::make_shared(hostname_one, port); + auto result_one = cp.GetConnection(remote_id); + auto remote_id2 = std::make_shared(hostname_two, port); + auto result_two = cp.GetConnection(remote_id2); } diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index c216d0b..e022f9e 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -28,7 +28,6 @@ using std::mutex; using std::unique_ptr; using std::shared_ptr; -using hbase::pb::ServerName; using hbase::ConnectionPool; using hbase::HBaseService; using folly::SharedMutexWritePriority; @@ -47,33 +46,36 @@ ConnectionPool::~ConnectionPool() { SharedMutexWritePriority::WriteHolder holder(map_mutex_); for (auto &item : connections_) { auto &con = item.second; - con->close(); + con->Close(); } connections_.clear(); clients_.clear(); } -std::shared_ptr ConnectionPool::Get(const ServerName &sn) { +std::shared_ptr ConnectionPool::GetConnection( + std::shared_ptr remote_id) { // Try and get th cached connection. - auto found_ptr = GetCached(sn); + auto found_ptr = GetCachedConnection(remote_id); // If there's no connection then create it. if (found_ptr == nullptr) { - found_ptr = GetNew(sn); + found_ptr = GetNewConnection(remote_id); } return found_ptr; } -std::shared_ptr ConnectionPool::GetCached(const ServerName &sn) { +std::shared_ptr ConnectionPool::GetCachedConnection( + std::shared_ptr remote_id) { SharedMutexWritePriority::ReadHolder holder(map_mutex_); - auto found = connections_.find(sn); + auto found = connections_.find(remote_id); if (found == connections_.end()) { return nullptr; } return found->second; } -std::shared_ptr ConnectionPool::GetNew(const ServerName &sn) { +std::shared_ptr ConnectionPool::GetNewConnection( + std::shared_ptr remote_id) { // Grab the upgrade lock. While we are double checking other readers can // continue on SharedMutexWritePriority::UpgradeHolder u_holder{map_mutex_}; @@ -81,7 +83,7 @@ std::shared_ptr ConnectionPool::GetNew(const ServerName &sn) { // Now check if someone else created the connection before we got the lock // This is safe since we hold the upgrade lock. // upgrade lock is more power than the reader lock. - auto found = connections_.find(sn); + auto found = connections_.find(remote_id); if (found != connections_.end() && found->second != nullptr) { return found->second; } else { @@ -89,24 +91,29 @@ std::shared_ptr ConnectionPool::GetNew(const ServerName &sn) { SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)}; // Make double sure there are not stale connections hanging around. - connections_.erase(sn); - - // Nope we are the ones who should create the new connection. - auto client = cf_->MakeBootstrap(); - auto dispatcher = cf_->Connect(client, sn.host_name(), sn.port()); - clients_.insert(std::make_pair(sn, client)); - connections_.insert(std::make_pair(sn, dispatcher)); - return dispatcher; + connections_.erase(remote_id); + + /* create new connection */ + auto clientBootstrap = cf_->MakeBootstrap(); + auto dispatcher = + cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port()); + + auto conneciton = std::make_shared(remote_id, dispatcher); + + connections_.insert(std::make_pair(remote_id, conneciton)); + clients_.insert(std::make_pair(remote_id, clientBootstrap)); + + return conneciton; } } -void ConnectionPool::Close(const ServerName &sn) { +void ConnectionPool::Close(std::shared_ptr remote_id) { SharedMutexWritePriority::WriteHolder holder{map_mutex_}; - auto found = connections_.find(sn); + auto found = connections_.find(remote_id); if (found == connections_.end() || found->second == nullptr) { return; } - auto service = found->second; + found->second->Close(); connections_.erase(found); } diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index 2624336..96d89ac 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -25,38 +25,24 @@ #include #include "connection/connection-factory.h" +#include "connection/connection-id.h" +#include "connection/rpc-connection.h" #include "connection/service.h" #include "if/HBase.pb.h" -namespace hbase { -/** Equals function for server name that ignores start time */ -struct ServerNameEquals { - /** equals */ - bool operator()(const hbase::pb::ServerName &lhs, - const hbase::pb::ServerName &rhs) const { - return lhs.host_name() == rhs.host_name() && lhs.port() == rhs.port(); - } -}; +using hbase::ConnectionId; +using hbase::ConnectionIdEquals; +using hbase::ConnectionIdHash; +using hbase::RpcConnection; -/** Hash for ServerName that ignores the start time. */ -struct ServerNameHash { - /** hash */ - std::size_t operator()(hbase::pb::ServerName const &s) const { - std::size_t h = 0; - boost::hash_combine(h, s.host_name()); - boost::hash_combine(h, s.port()); - return h; - } -}; +namespace hbase { /** * @brief Connection pooling for HBase rpc connection. * * This is a thread safe connection pool. It allows getting - * a shared connection to HBase by server name. This is - * useful for keeping a single connection no matter how many regions a - * regionserver has on it. + * a shared rpc connection to HBase servers by connection id. */ class ConnectionPool { public: @@ -81,23 +67,27 @@ class ConnectionPool { * Get a connection to the server name. Start time is ignored. * This can be a blocking operation for a short time. */ - std::shared_ptr Get(const hbase::pb::ServerName &sn); + std::shared_ptr GetConnection( + std::shared_ptr remote_id); /** * Close/remove a connection. */ - void Close(const hbase::pb::ServerName &sn); + void Close(std::shared_ptr remote_id); private: - std::shared_ptr GetCached(const hbase::pb::ServerName &sn); - std::shared_ptr GetNew(const hbase::pb::ServerName &sn); - std::unordered_map, - ServerNameHash, ServerNameEquals> + std::shared_ptr GetCachedConnection( + std::shared_ptr remote_id); + std::shared_ptr GetNewConnection( + std::shared_ptr remote_id); + std::unordered_map, + std::shared_ptr, ConnectionIdHash, + ConnectionIdEquals> connections_; std::unordered_map< - hbase::pb::ServerName, + std::shared_ptr, std::shared_ptr>, - ServerNameHash, ServerNameEquals> + ConnectionIdHash, ConnectionIdEquals> clients_; folly::SharedMutexWritePriority map_mutex_; std::shared_ptr cf_; diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc new file mode 100644 index 0000000..66ec231 --- /dev/null +++ b/hbase-native-client/connection/rpc-client.cc @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "connection/rpc-client.h" +#include +#include + +using hbase::RpcClient; +using hbase::AbstractRpcChannel; + +namespace hbase { + +class RpcChannelImplementation : public AbstractRpcChannel { + public: + RpcChannelImplementation(std::shared_ptr rpc_client, + const std::string& host, uint16_t port, + std::shared_ptr ticket, int rpc_timeout) + : AbstractRpcChannel(rpc_client, host, port, ticket, rpc_timeout) {} + + void CallMethod(const MethodDescriptor* method, RpcController* controller, + const Message* request, Message* response, + Closure* done) override { + rpc_client_->CallMethod(method, controller, request, response, done, host_, + port_, ticket_); + } +}; +} // namespace hbase + +RpcClient::RpcClient() { + auto io_executor = std::make_shared( + sysconf(_SC_NPROCESSORS_ONLN)); + + cp_ = std::make_shared(io_executor); +} + +void RpcClient::Close() {} + +std::shared_ptr RpcClient::SyncCall(const std::string& host, + uint16_t port, + std::unique_ptr req, + std::shared_ptr ticket) { + return std::make_shared( + AsyncCall(host, port, std::move(req), ticket).get()); +} + +std::shared_ptr RpcClient::SyncCall(const std::string& host, + uint16_t port, + std::unique_ptr req, + std::shared_ptr ticket, + const std::string& service_name) { + return std::make_shared( + AsyncCall(host, port, std::move(req), ticket, service_name).get()); +} + +folly::Future RpcClient::AsyncCall(const std::string& host, + uint16_t port, + std::unique_ptr req, + std::shared_ptr ticket) { + auto remote_id = std::make_shared(host, port, ticket); + return GetConnection(remote_id)->SendRequest(std::move(req)); +} + +folly::Future RpcClient::AsyncCall(const std::string& host, + uint16_t port, + std::unique_ptr req, + std::shared_ptr ticket, + const std::string& service_name) { + auto remote_id = + std::make_shared(host, port, ticket, service_name); + return GetConnection(remote_id)->SendRequest(std::move(req)); +} + +std::shared_ptr RpcClient::GetConnection( + std::shared_ptr remote_id) { + return cp_->GetConnection(remote_id); +} + +std::shared_ptr RpcClient::CreateRpcChannel( + const std::string& host, uint16_t port, std::shared_ptr ticket, + int rpc_timeout) { + std::shared_ptr channel = + std::make_shared(shared_from_this(), host, port, + ticket, rpc_timeout); + + /* static_pointer_cast is safe since RpcChannelImplementation derives + * from RpcChannel, otherwise, dynamic_pointer_cast should be used. */ + return std::static_pointer_cast(channel); +} + +void RpcClient::CallMethod(const MethodDescriptor* method, + RpcController* controller, const Message* req_msg, + Message* resp_msg, Closure* done, + const std::string& host, uint16_t port, + std::shared_ptr ticket) { + std::shared_ptr shared_req(const_cast(req_msg)); + std::shared_ptr shared_resp(resp_msg); + + std::unique_ptr req = + std::make_unique(shared_req, shared_resp, method->name()); + + AsyncCall(host, port, std::move(req), ticket) + .then([done, this](Response resp) { done->Run(); }); +} diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h new file mode 100644 index 0000000..c24db9d --- /dev/null +++ b/hbase-native-client/connection/rpc-client.h @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include "connection/connection-id.h" +#include "connection/connection-pool.h" +#include "connection/request.h" +#include "connection/response.h" +#include "security/user.h" + +#include + +using hbase::security::User; +using hbase::pb::ServerName; +using hbase::Request; +using hbase::Response; +using hbase::ConnectionId; +using hbase::ConnectionPool; +using hbase::RpcConnection; +using hbase::security::User; + +using google::protobuf::MethodDescriptor; +using google::protobuf::RpcChannel; +using google::protobuf::Message; +using google::protobuf::RpcController; +using google::protobuf::Closure; + +class RpcChannelImplementation; + +namespace hbase { + +class RpcClient : public std::enable_shared_from_this { + friend class RpcChannelImplementation; + + public: + RpcClient(); + + virtual ~RpcClient() { Close(); } + + virtual std::shared_ptr SyncCall(const std::string &host, + uint16_t port, + std::unique_ptr req, + std::shared_ptr ticket); + + virtual std::shared_ptr SyncCall(const std::string &host, + uint16_t port, + std::unique_ptr req, + std::shared_ptr ticket, + const std::string &service_name); + + virtual folly::Future AsyncCall(const std::string &host, + uint16_t port, + std::unique_ptr req, + std::shared_ptr ticket); + + virtual folly::Future AsyncCall(const std::string &host, + uint16_t port, + std::unique_ptr req, + std::shared_ptr ticket, + const std::string &service_name); + + virtual void Close(); + + virtual std::shared_ptr CreateRpcChannel( + const std::string &host, uint16_t port, std::shared_ptr ticket, + int rpc_timeout); + + private: + void CallMethod(const MethodDescriptor *method, RpcController *controller, + const Message *req_msg, Message *resp_msg, Closure *done, + const std::string &host, uint16_t port, + std::shared_ptr ticket); + std::shared_ptr GetConnection( + std::shared_ptr remote_id); + + private: + std::shared_ptr cp_; +}; + +class AbstractRpcChannel : public RpcChannel { + public: + AbstractRpcChannel(std::shared_ptr rpc_client, + const std::string &host, uint16_t port, + std::shared_ptr ticket, int rpc_timeout) + : rpc_client_(rpc_client), + host_(host), + port_(port), + ticket_(ticket), + rpc_timeout_(rpc_timeout) {} + + virtual ~AbstractRpcChannel() = default; + + protected: + std::shared_ptr rpc_client_; + std::string host_; + uint16_t port_; + std::shared_ptr ticket_; + int rpc_timeout_; +}; +} // namespace hbase diff --git a/hbase-native-client/connection/rpc-connection.h b/hbase-native-client/connection/rpc-connection.h new file mode 100644 index 0000000..2e06ec3 --- /dev/null +++ b/hbase-native-client/connection/rpc-connection.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include "connection/connection-id.h" +#include "connection/request.h" +#include "connection/response.h" +#include "connection/service.h" + +#include +#include + +using hbase::HBaseService; + +namespace hbase { +class RpcConnection { + public: + RpcConnection(std::shared_ptr connection_id, + std::shared_ptr hbase_service) + : connection_id_(connection_id), hbase_service_(hbase_service) {} + + virtual ~RpcConnection() { Close(); } + + virtual std::shared_ptr remote_id() const { + return connection_id_; + } + + virtual std::shared_ptr get_service() const { + return hbase_service_; + } + + virtual folly::Future SendRequest(std::unique_ptr req) { + return (*hbase_service_)(std::move(req)); + } + + virtual void Close() { hbase_service_->close(); } + + private: + std::shared_ptr connection_id_; + std::shared_ptr hbase_service_; +}; +} // namespace hbase diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index a0ca5ca..4c29a61 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -26,6 +26,7 @@ #include #include "connection/response.h" +#include "connection/rpc-connection.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" #include "serde/region-info.h" @@ -34,6 +35,7 @@ using namespace std; using namespace folly; +using hbase::RpcConnection; using wangle::ServiceFilter; using hbase::Request; @@ -121,9 +123,14 @@ Future> LocationCache::LocateFromMeta( const TableName &tn, const string &row) { return this->LocateMeta() .via(cpu_executor_.get()) - .then([this](ServerName sn) { return this->cp_.Get(sn); }) - .then([tn, row, this](std::shared_ptr service) { - return (*service)(std::move(meta_util_.MetaRequest(tn, row))); + .then([this](ServerName sn) { + auto remote_id = + std::make_shared(sn.host_name(), sn.port()); + return this->cp_.GetConnection(remote_id); + }) + .then([tn, row, this](std::shared_ptr rpc_connection) { + return (*rpc_connection->get_service())( + std::move(meta_util_.MetaRequest(tn, row))); }) .then([this](Response resp) { // take the protobuf response and make it into @@ -139,8 +146,10 @@ Future> LocationCache::LocateFromMeta( return rl; }) .then([this](std::shared_ptr rl) { + auto remote_id = std::make_shared( + rl->server_name().host_name(), rl->server_name().port()); // Now fill out the connection. - rl->set_service(cp_.Get(rl->server_name())); + rl->set_service(cp_.GetConnection(remote_id)->get_service()); return rl; }); } diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/security/BUCK similarity index 59% copy from hbase-native-client/connection/BUCK copy to hbase-native-client/security/BUCK index f093d5a..5b935d3 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/security/BUCK @@ -18,35 +18,13 @@ # This is the library dealing with a single connection # to a single server. cxx_library( - name="connection", + name="security", exported_headers=[ - "client-dispatcher.h", - "client-handler.h", - "connection-factory.h", - "connection-pool.h", - "pipeline.h", - "request.h", - "response.h", - "service.h", + "user.h", ], srcs=[ - "client-dispatcher.cc", - "client-handler.cc", - "connection-factory.cc", - "connection-pool.cc", - "pipeline.cc", - "request.cc", ], deps=[ - "//if:if", - "//utils:utils", - "//serde:serde", - "//third-party:folly", - "//third-party:wangle", ], compiler_flags=['-Weffc++'], - visibility=['//core/...',],) -cxx_test( - name="connection-pool-test", - srcs=["connection-pool-test.cc",], - deps=[":connection",],) + visibility=['//core/...','//connection/...'],) diff --git a/hbase-native-client/security/user.h b/hbase-native-client/security/user.h new file mode 100644 index 0000000..795f5ac --- /dev/null +++ b/hbase-native-client/security/user.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include + +namespace hbase { +namespace security { +class User { +public: + explicit User(const std::string& user_name) : user_name_(user_name) {} + virtual ~User() = default; + + std::string user_name() {return user_name_;} + + static std::shared_ptr defaultUser() { + return std::make_shared("__drwho"); + } +private: + std::string user_name_; +}; +} +}