hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject hbase git commit: HBASE-17051 [C++] implement RPC client and connection management (Xiaobing Zhou)
Date Tue, 20 Dec 2016 02:39:01 GMT
Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 5dbc1b5db -> 5459e0d6b


HBASE-17051 [C++] implement RPC client and connection management (Xiaobing Zhou)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5459e0d6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5459e0d6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5459e0d6

Branch: refs/heads/HBASE-14850
Commit: 5459e0d6beab8adaf5cb32bb94d3bbbc7a90eb46
Parents: 5dbc1b5
Author: Enis Soztutar <enis@apache.org>
Authored: Mon Dec 19 18:38:51 2016 -0800
Committer: Enis Soztutar <enis@apache.org>
Committed: Mon Dec 19 18:38:51 2016 -0800

----------------------------------------------------------------------
 hbase-native-client/Makefile                    |   2 +-
 hbase-native-client/connection/BUCK             |   3 +
 hbase-native-client/connection/connection-id.h  |  88 ++++++++++++++
 .../connection/connection-pool-test.cc          |  35 +++---
 .../connection/connection-pool.cc               |  47 ++++----
 .../connection/connection-pool.h                |  50 ++++----
 hbase-native-client/connection/rpc-client.cc    | 119 +++++++++++++++++++
 hbase-native-client/connection/rpc-client.h     | 116 ++++++++++++++++++
 hbase-native-client/connection/rpc-connection.h |  58 +++++++++
 hbase-native-client/core/location-cache.cc      |  17 ++-
 hbase-native-client/security/BUCK               |  30 +++++
 hbase-native-client/security/user.h             |  39 ++++++
 12 files changed, 531 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/Makefile
----------------------------------------------------------------------
diff --git a/hbase-native-client/Makefile b/hbase-native-client/Makefile
index be5d461..96c275e 100644
--- a/hbase-native-client/Makefile
+++ b/hbase-native-client/Makefile
@@ -22,7 +22,7 @@ LD:=g++
 DEBUG_PATH = build/debug
 RELEASE_PATH = build/release
 PROTO_SRC_DIR = build/if
-MODULES = connection core serde test-util utils
+MODULES = connection core serde test-util utils security
 SRC_DIR = $(MODULES)
 DEBUG_BUILD_DIR = $(addprefix $(DEBUG_PATH)/,$(MODULES))
 RELEASE_BUILD_DIR = $(addprefix $(RELEASE_PATH)/,$(MODULES))

http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index f093d5a..c22cc89 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -24,8 +24,10 @@ cxx_library(
         "client-handler.h",
         "connection-factory.h",
         "connection-pool.h",
+        "connection-id.h",
         "pipeline.h",
         "request.h",
+        "rpc-connection.h",
         "response.h",
         "service.h",
     ],
@@ -41,6 +43,7 @@ cxx_library(
         "//if:if",
         "//utils:utils",
         "//serde:serde",
+        "//security:security",
         "//third-party:folly",
         "//third-party:wangle",
     ],

http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/connection-id.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-id.h b/hbase-native-client/connection/connection-id.h
new file mode 100644
index 0000000..62fe222
--- /dev/null
+++ b/hbase-native-client/connection/connection-id.h
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include "if/HBase.pb.h"
+#include "security/user.h"
+
+#include <memory>
+#include <utility>
+#include <boost/functional/hash.hpp>
+
+using hbase::pb::ServerName;
+using hbase::security::User;
+
+namespace hbase {
+class ConnectionId {
+ public:
+  ConnectionId(const std::string &host, uint16_t port)
+      : ConnectionId(host, port, User::defaultUser(), "") {}
+
+  ConnectionId(const std::string &host, uint16_t port,
+               std::shared_ptr<User> user)
+      : ConnectionId(host, port, user, "") {}
+
+  ConnectionId(const std::string &host, uint16_t port,
+               std::shared_ptr<User> user, const std::string &service_name)
+      : user_(user), service_name_(service_name), host_(host), port_(port) {}
+
+  virtual ~ConnectionId() = default;
+
+  std::shared_ptr<User> user() const { return user_; }
+  std::string service_name() const { return service_name_; }
+  std::string host() { return host_; }
+  uint16_t port() { return port_; }
+
+ private:
+  std::shared_ptr<User> user_;
+  std::string service_name_;
+  std::string host_;
+  uint16_t port_;
+};
+
+/* Equals function for ConnectionId */
+struct ConnectionIdEquals {
+  /** equals */
+  bool operator()(const std::shared_ptr<ConnectionId> &lhs,
+                  const std::shared_ptr<ConnectionId> &rhs) const {
+    return userEquals(lhs->user(), rhs->user()) && lhs->host() == rhs->host()
&&
+           lhs->port() == rhs->port();
+  }
+
+ private:
+  bool userEquals(const std::shared_ptr<User> &lhs,
+                  const std::shared_ptr<User> &rhs) const {
+    return lhs == nullptr ? rhs == nullptr
+                          : (rhs == nullptr ? false : lhs->user_name() ==
+                                                          rhs->user_name());
+  }
+};
+
+/** Hash for ConnectionId. */
+struct ConnectionIdHash {
+  /** hash */
+  std::size_t operator()(const std::shared_ptr<ConnectionId> &ci) const {
+    std::size_t h = 0;
+    boost::hash_combine(h, ci->user() == nullptr ? 0 : ci->user()->user_name());
+    boost::hash_combine(h, ci->host());
+    boost::hash_combine(h, ci->port());
+    return h;
+  }
+};
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/connection-pool-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc
index e17c16c..4547b30 100644
--- a/hbase-native-client/connection/connection-pool-test.cc
+++ b/hbase-native-client/connection/connection-pool-test.cc
@@ -17,20 +17,22 @@
  *
  */
 
+#include "connection/connection-id.h"
 #include "connection/connection-pool.h"
-
-#include <folly/Logging.h>
-#include <gmock/gmock.h>
-
 #include "connection/connection-factory.h"
+
 #include "if/HBase.pb.h"
 #include "serde/server-name.h"
 
+#include <folly/Logging.h>
+#include <gmock/gmock.h>
+
 using namespace hbase;
 
 using hbase::pb::ServerName;
 using ::testing::Return;
 using ::testing::_;
+using hbase::ConnectionId;
 
 class MockConnectionFactory : public ConnectionFactory {
  public:
@@ -75,13 +77,10 @@ TEST(TestConnectionPool, TestOnlyCreateOnce) {
       .WillRepeatedly(Return(mock_boot));
   ConnectionPool cp{mock_cf};
 
-  ServerName sn;
-  sn.set_host_name(hostname);
-  sn.set_port(port);
-
-  auto result = cp.Get(sn);
+  auto remote_id = std::make_shared<ConnectionId>(hostname, port);
+  auto result = cp.GetConnection(remote_id);
   ASSERT_TRUE(result != nullptr);
-  result = cp.Get(sn);
+  result = cp.GetConnection(remote_id);
 }
 
 TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) {
@@ -102,13 +101,13 @@ TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) {
   ConnectionPool cp{mock_cf};
 
   {
-    auto result_one = cp.Get(folly::to<ServerName>(
-        hostname_one + ":" + folly::to<std::string>(port)));
-    auto result_two = cp.Get(folly::to<ServerName>(
-        hostname_two + ":" + folly::to<std::string>(port)));
+    auto remote_id = std::make_shared<ConnectionId>(hostname_one, port);
+    auto result_one = cp.GetConnection(remote_id);
+    auto remote_id2 = std::make_shared<ConnectionId>(hostname_two, port);
+    auto result_two = cp.GetConnection(remote_id2);
   }
-  auto result_one = cp.Get(
-      folly::to<ServerName>(hostname_one + ":" + folly::to<std::string>(port)));
-  auto result_two = cp.Get(
-      folly::to<ServerName>(hostname_two + ":" + folly::to<std::string>(port)));
+  auto remote_id = std::make_shared<ConnectionId>(hostname_one, port);
+  auto result_one = cp.GetConnection(remote_id);
+  auto remote_id2 = std::make_shared<ConnectionId>(hostname_two, port);
+  auto result_two = cp.GetConnection(remote_id2);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/connection-pool.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index c216d0b..e022f9e 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -28,7 +28,6 @@
 using std::mutex;
 using std::unique_ptr;
 using std::shared_ptr;
-using hbase::pb::ServerName;
 using hbase::ConnectionPool;
 using hbase::HBaseService;
 using folly::SharedMutexWritePriority;
@@ -47,33 +46,36 @@ ConnectionPool::~ConnectionPool() {
   SharedMutexWritePriority::WriteHolder holder(map_mutex_);
   for (auto &item : connections_) {
     auto &con = item.second;
-    con->close();
+    con->Close();
   }
   connections_.clear();
   clients_.clear();
 }
 
-std::shared_ptr<HBaseService> ConnectionPool::Get(const ServerName &sn) {
+std::shared_ptr<RpcConnection> ConnectionPool::GetConnection(
+    std::shared_ptr<ConnectionId> remote_id) {
   // Try and get th cached connection.
-  auto found_ptr = GetCached(sn);
+  auto found_ptr = GetCachedConnection(remote_id);
 
   // If there's no connection then create it.
   if (found_ptr == nullptr) {
-    found_ptr = GetNew(sn);
+    found_ptr = GetNewConnection(remote_id);
   }
   return found_ptr;
 }
 
-std::shared_ptr<HBaseService> ConnectionPool::GetCached(const ServerName &sn) {
+std::shared_ptr<RpcConnection> ConnectionPool::GetCachedConnection(
+    std::shared_ptr<ConnectionId> remote_id) {
   SharedMutexWritePriority::ReadHolder holder(map_mutex_);
-  auto found = connections_.find(sn);
+  auto found = connections_.find(remote_id);
   if (found == connections_.end()) {
     return nullptr;
   }
   return found->second;
 }
 
-std::shared_ptr<HBaseService> ConnectionPool::GetNew(const ServerName &sn) {
+std::shared_ptr<RpcConnection> ConnectionPool::GetNewConnection(
+    std::shared_ptr<ConnectionId> remote_id) {
   // Grab the upgrade lock. While we are double checking other readers can
   // continue on
   SharedMutexWritePriority::UpgradeHolder u_holder{map_mutex_};
@@ -81,7 +83,7 @@ std::shared_ptr<HBaseService> ConnectionPool::GetNew(const ServerName
&sn) {
   // Now check if someone else created the connection before we got the lock
   // This is safe since we hold the upgrade lock.
   // upgrade lock is more power than the reader lock.
-  auto found = connections_.find(sn);
+  auto found = connections_.find(remote_id);
   if (found != connections_.end() && found->second != nullptr) {
     return found->second;
   } else {
@@ -89,24 +91,29 @@ std::shared_ptr<HBaseService> ConnectionPool::GetNew(const ServerName
&sn) {
     SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)};
 
     // Make double sure there are not stale connections hanging around.
-    connections_.erase(sn);
-
-    // Nope we are the ones who should create the new connection.
-    auto client = cf_->MakeBootstrap();
-    auto dispatcher = cf_->Connect(client, sn.host_name(), sn.port());
-    clients_.insert(std::make_pair(sn, client));
-    connections_.insert(std::make_pair(sn, dispatcher));
-    return dispatcher;
+    connections_.erase(remote_id);
+
+    /* create new connection */
+    auto clientBootstrap = cf_->MakeBootstrap();
+    auto dispatcher =
+        cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port());
+
+    auto conneciton = std::make_shared<RpcConnection>(remote_id, dispatcher);
+
+    connections_.insert(std::make_pair(remote_id, conneciton));
+    clients_.insert(std::make_pair(remote_id, clientBootstrap));
+
+    return conneciton;
   }
 }
 
-void ConnectionPool::Close(const ServerName &sn) {
+void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id) {
   SharedMutexWritePriority::WriteHolder holder{map_mutex_};
 
-  auto found = connections_.find(sn);
+  auto found = connections_.find(remote_id);
   if (found == connections_.end() || found->second == nullptr) {
     return;
   }
-  auto service = found->second;
+  found->second->Close();
   connections_.erase(found);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/connection-pool.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
index 2624336..96d89ac 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -25,38 +25,24 @@
 #include <unordered_map>
 
 #include "connection/connection-factory.h"
+#include "connection/connection-id.h"
+#include "connection/rpc-connection.h"
 #include "connection/service.h"
 #include "if/HBase.pb.h"
 
-namespace hbase {
 
-/** Equals function for server name that ignores start time */
-struct ServerNameEquals {
-  /** equals */
-  bool operator()(const hbase::pb::ServerName &lhs,
-                  const hbase::pb::ServerName &rhs) const {
-    return lhs.host_name() == rhs.host_name() && lhs.port() == rhs.port();
-  }
-};
+using hbase::ConnectionId;
+using hbase::ConnectionIdEquals;
+using hbase::ConnectionIdHash;
+using hbase::RpcConnection;
 
-/** Hash for ServerName that ignores the start time. */
-struct ServerNameHash {
-  /** hash */
-  std::size_t operator()(hbase::pb::ServerName const &s) const {
-    std::size_t h = 0;
-    boost::hash_combine(h, s.host_name());
-    boost::hash_combine(h, s.port());
-    return h;
-  }
-};
+namespace hbase {
 
 /**
  * @brief Connection pooling for HBase rpc connection.
  *
  * This is a thread safe connection pool. It allows getting
- * a shared connection to HBase by server name. This is
- * useful for keeping a single connection no matter how many regions a
- * regionserver has on it.
+ * a shared rpc connection to HBase servers by connection id.
  */
 class ConnectionPool {
  public:
@@ -81,23 +67,27 @@ class ConnectionPool {
    * Get a connection to the server name. Start time is ignored.
    * This can be a blocking operation for a short time.
    */
-  std::shared_ptr<HBaseService> Get(const hbase::pb::ServerName &sn);
+  std::shared_ptr<RpcConnection> GetConnection(
+      std::shared_ptr<ConnectionId> remote_id);
 
   /**
    * Close/remove a connection.
    */
-  void Close(const hbase::pb::ServerName &sn);
+  void Close(std::shared_ptr<ConnectionId> remote_id);
 
  private:
-  std::shared_ptr<HBaseService> GetCached(const hbase::pb::ServerName &sn);
-  std::shared_ptr<HBaseService> GetNew(const hbase::pb::ServerName &sn);
-  std::unordered_map<hbase::pb::ServerName, std::shared_ptr<HBaseService>,
-                     ServerNameHash, ServerNameEquals>
+  std::shared_ptr<RpcConnection> GetCachedConnection(
+      std::shared_ptr<ConnectionId> remote_id);
+  std::shared_ptr<RpcConnection> GetNewConnection(
+      std::shared_ptr<ConnectionId> remote_id);
+  std::unordered_map<std::shared_ptr<ConnectionId>,
+                     std::shared_ptr<RpcConnection>, ConnectionIdHash,
+                     ConnectionIdEquals>
       connections_;
   std::unordered_map<
-      hbase::pb::ServerName,
+      std::shared_ptr<ConnectionId>,
       std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>,
-      ServerNameHash, ServerNameEquals>
+      ConnectionIdHash, ConnectionIdEquals>
       clients_;
   folly::SharedMutexWritePriority map_mutex_;
   std::shared_ptr<ConnectionFactory> cf_;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/rpc-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
new file mode 100644
index 0000000..66ec231
--- /dev/null
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/rpc-client.h"
+#include <unistd.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
+
+using hbase::RpcClient;
+using hbase::AbstractRpcChannel;
+
+namespace hbase {
+
+class RpcChannelImplementation : public AbstractRpcChannel {
+ public:
+  RpcChannelImplementation(std::shared_ptr<RpcClient> rpc_client,
+                           const std::string& host, uint16_t port,
+                           std::shared_ptr<User> ticket, int rpc_timeout)
+      : AbstractRpcChannel(rpc_client, host, port, ticket, rpc_timeout) {}
+
+  void CallMethod(const MethodDescriptor* method, RpcController* controller,
+                  const Message* request, Message* response,
+                  Closure* done) override {
+    rpc_client_->CallMethod(method, controller, request, response, done, host_,
+                            port_, ticket_);
+  }
+};
+}  // namespace hbase
+
+RpcClient::RpcClient() {
+  auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(
+      sysconf(_SC_NPROCESSORS_ONLN));
+
+  cp_ = std::make_shared<ConnectionPool>(io_executor);
+}
+
+void RpcClient::Close() {}
+
+std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host,
+                                              uint16_t port,
+                                              std::unique_ptr<Request> req,
+                                              std::shared_ptr<User> ticket) {
+  return std::make_shared<Response>(
+      AsyncCall(host, port, std::move(req), ticket).get());
+}
+
+std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host,
+                                              uint16_t port,
+                                              std::unique_ptr<Request> req,
+                                              std::shared_ptr<User> ticket,
+                                              const std::string& service_name) {
+  return std::make_shared<Response>(
+      AsyncCall(host, port, std::move(req), ticket, service_name).get());
+}
+
+folly::Future<Response> RpcClient::AsyncCall(const std::string& host,
+                                             uint16_t port,
+                                             std::unique_ptr<Request> req,
+                                             std::shared_ptr<User> ticket) {
+  auto remote_id = std::make_shared<ConnectionId>(host, port, ticket);
+  return GetConnection(remote_id)->SendRequest(std::move(req));
+}
+
+folly::Future<Response> RpcClient::AsyncCall(const std::string& host,
+                                             uint16_t port,
+                                             std::unique_ptr<Request> req,
+                                             std::shared_ptr<User> ticket,
+                                             const std::string& service_name) {
+  auto remote_id =
+      std::make_shared<ConnectionId>(host, port, ticket, service_name);
+  return GetConnection(remote_id)->SendRequest(std::move(req));
+}
+
+std::shared_ptr<RpcConnection> RpcClient::GetConnection(
+    std::shared_ptr<ConnectionId> remote_id) {
+  return cp_->GetConnection(remote_id);
+}
+
+std::shared_ptr<RpcChannel> RpcClient::CreateRpcChannel(
+    const std::string& host, uint16_t port, std::shared_ptr<User> ticket,
+    int rpc_timeout) {
+  std::shared_ptr<RpcChannelImplementation> channel =
+      std::make_shared<RpcChannelImplementation>(shared_from_this(), host, port,
+                                                 ticket, rpc_timeout);
+
+  /* static_pointer_cast is safe since RpcChannelImplementation derives
+   * from RpcChannel, otherwise, dynamic_pointer_cast should be used. */
+  return std::static_pointer_cast<RpcChannel>(channel);
+}
+
+void RpcClient::CallMethod(const MethodDescriptor* method,
+                           RpcController* controller, const Message* req_msg,
+                           Message* resp_msg, Closure* done,
+                           const std::string& host, uint16_t port,
+                           std::shared_ptr<User> ticket) {
+  std::shared_ptr<Message> shared_req(const_cast<Message*>(req_msg));
+  std::shared_ptr<Message> shared_resp(resp_msg);
+
+  std::unique_ptr<Request> req =
+      std::make_unique<Request>(shared_req, shared_resp, method->name());
+
+  AsyncCall(host, port, std::move(req), ticket)
+      .then([done, this](Response resp) { done->Run(); });
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/rpc-client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
new file mode 100644
index 0000000..c24db9d
--- /dev/null
+++ b/hbase-native-client/connection/rpc-client.h
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include "connection/connection-id.h"
+#include "connection/connection-pool.h"
+#include "connection/request.h"
+#include "connection/response.h"
+#include "security/user.h"
+
+#include <google/protobuf/service.h>
+
+using hbase::security::User;
+using hbase::pb::ServerName;
+using hbase::Request;
+using hbase::Response;
+using hbase::ConnectionId;
+using hbase::ConnectionPool;
+using hbase::RpcConnection;
+using hbase::security::User;
+
+using google::protobuf::MethodDescriptor;
+using google::protobuf::RpcChannel;
+using google::protobuf::Message;
+using google::protobuf::RpcController;
+using google::protobuf::Closure;
+
+class RpcChannelImplementation;
+
+namespace hbase {
+
+class RpcClient : public std::enable_shared_from_this<RpcClient> {
+  friend class RpcChannelImplementation;
+
+ public:
+  RpcClient();
+
+  virtual ~RpcClient() { Close(); }
+
+  virtual std::shared_ptr<Response> SyncCall(const std::string &host,
+                                             uint16_t port,
+                                             std::unique_ptr<Request> req,
+                                             std::shared_ptr<User> ticket);
+
+  virtual std::shared_ptr<Response> SyncCall(const std::string &host,
+                                             uint16_t port,
+                                             std::unique_ptr<Request> req,
+                                             std::shared_ptr<User> ticket,
+                                             const std::string &service_name);
+
+  virtual folly::Future<Response> AsyncCall(const std::string &host,
+                                            uint16_t port,
+                                            std::unique_ptr<Request> req,
+                                            std::shared_ptr<User> ticket);
+
+  virtual folly::Future<Response> AsyncCall(const std::string &host,
+                                            uint16_t port,
+                                            std::unique_ptr<Request> req,
+                                            std::shared_ptr<User> ticket,
+                                            const std::string &service_name);
+
+  virtual void Close();
+
+  virtual std::shared_ptr<RpcChannel> CreateRpcChannel(
+      const std::string &host, uint16_t port, std::shared_ptr<User> ticket,
+      int rpc_timeout);
+
+ private:
+  void CallMethod(const MethodDescriptor *method, RpcController *controller,
+                  const Message *req_msg, Message *resp_msg, Closure *done,
+                  const std::string &host, uint16_t port,
+                  std::shared_ptr<User> ticket);
+  std::shared_ptr<RpcConnection> GetConnection(
+      std::shared_ptr<ConnectionId> remote_id);
+
+ private:
+  std::shared_ptr<ConnectionPool> cp_;
+};
+
+class AbstractRpcChannel : public RpcChannel {
+ public:
+  AbstractRpcChannel(std::shared_ptr<RpcClient> rpc_client,
+                     const std::string &host, uint16_t port,
+                     std::shared_ptr<User> ticket, int rpc_timeout)
+      : rpc_client_(rpc_client),
+        host_(host),
+        port_(port),
+        ticket_(ticket),
+        rpc_timeout_(rpc_timeout) {}
+
+  virtual ~AbstractRpcChannel() = default;
+
+ protected:
+  std::shared_ptr<RpcClient> rpc_client_;
+  std::string host_;
+  uint16_t port_;
+  std::shared_ptr<User> ticket_;
+  int rpc_timeout_;
+};
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/rpc-connection.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-connection.h b/hbase-native-client/connection/rpc-connection.h
new file mode 100644
index 0000000..2e06ec3
--- /dev/null
+++ b/hbase-native-client/connection/rpc-connection.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include "connection/connection-id.h"
+#include "connection/request.h"
+#include "connection/response.h"
+#include "connection/service.h"
+
+#include <memory>
+#include <utility>
+
+using hbase::HBaseService;
+
+namespace hbase {
+class RpcConnection {
+ public:
+  RpcConnection(std::shared_ptr<ConnectionId> connection_id,
+                std::shared_ptr<HBaseService> hbase_service)
+      : connection_id_(connection_id), hbase_service_(hbase_service) {}
+
+  virtual ~RpcConnection() { Close(); }
+
+  virtual std::shared_ptr<ConnectionId> remote_id() const {
+    return connection_id_;
+  }
+
+  virtual std::shared_ptr<HBaseService> get_service() const {
+    return hbase_service_;
+  }
+
+  virtual folly::Future<Response> SendRequest(std::unique_ptr<Request> req) {
+    return (*hbase_service_)(std::move(req));
+  }
+
+  virtual void Close() { hbase_service_->close(); }
+
+ private:
+  std::shared_ptr<ConnectionId> connection_id_;
+  std::shared_ptr<HBaseService> hbase_service_;
+};
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/core/location-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index a0ca5ca..4c29a61 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -26,6 +26,7 @@
 #include <utility>
 
 #include "connection/response.h"
+#include "connection/rpc-connection.h"
 #include "if/Client.pb.h"
 #include "if/ZooKeeper.pb.h"
 #include "serde/region-info.h"
@@ -34,6 +35,7 @@
 
 using namespace std;
 using namespace folly;
+using hbase::RpcConnection;
 
 using wangle::ServiceFilter;
 using hbase::Request;
@@ -121,9 +123,14 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(
     const TableName &tn, const string &row) {
   return this->LocateMeta()
       .via(cpu_executor_.get())
-      .then([this](ServerName sn) { return this->cp_.Get(sn); })
-      .then([tn, row, this](std::shared_ptr<HBaseService> service) {
-        return (*service)(std::move(meta_util_.MetaRequest(tn, row)));
+      .then([this](ServerName sn) {
+        auto remote_id =
+            std::make_shared<ConnectionId>(sn.host_name(), sn.port());
+        return this->cp_.GetConnection(remote_id);
+      })
+      .then([tn, row, this](std::shared_ptr<RpcConnection> rpc_connection) {
+        return (*rpc_connection->get_service())(
+            std::move(meta_util_.MetaRequest(tn, row)));
       })
       .then([this](Response resp) {
         // take the protobuf response and make it into
@@ -139,8 +146,10 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(
         return rl;
       })
       .then([this](std::shared_ptr<RegionLocation> rl) {
+        auto remote_id = std::make_shared<ConnectionId>(
+            rl->server_name().host_name(), rl->server_name().port());
         // Now fill out the connection.
-        rl->set_service(cp_.Get(rl->server_name()));
+        rl->set_service(cp_.GetConnection(remote_id)->get_service());
         return rl;
       });
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/security/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/security/BUCK b/hbase-native-client/security/BUCK
new file mode 100644
index 0000000..5b935d3
--- /dev/null
+++ b/hbase-native-client/security/BUCK
@@ -0,0 +1,30 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is the library dealing with a single connection
+# to a single server.
+cxx_library(
+    name="security",
+    exported_headers=[
+        "user.h",
+    ],
+    srcs=[
+    ],
+    deps=[
+    ],
+    compiler_flags=['-Weffc++'],
+    visibility=['//core/...','//connection/...'],)

http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/security/user.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/security/user.h b/hbase-native-client/security/user.h
new file mode 100644
index 0000000..795f5ac
--- /dev/null
+++ b/hbase-native-client/security/user.h
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <string>
+
+namespace hbase {
+namespace security {
+class User {
+public:
+  explicit User(const std::string& user_name) : user_name_(user_name) {}
+  virtual ~User()  = default;
+
+  std::string user_name() {return user_name_;}
+
+  static std::shared_ptr<User> defaultUser() {
+    return std::make_shared<User>("__drwho");
+  }
+private:
+  std::string user_name_;
+};
+}
+}


Mime
View raw message