hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject hbase git commit: HBASE-15766 Show working puts
Date Thu, 05 May 2016 17:24:54 GMT
Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 f1997e0ef -> baa0a5e91


HBASE-15766 Show working puts

Summary:
Add on showing how a set of puts to a single connection will work.
This still needs retries and looking up what region each request is going to

Test Plan:
    ./buck-out/gen/core/simple-client -columns 100
    ../bin/hbase shell
    count 't'
    100 row(s) in 0.2470 seconds

Differential Revision: https://reviews.facebook.net/D57603


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/baa0a5e9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/baa0a5e9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/baa0a5e9

Branch: refs/heads/HBASE-14850
Commit: baa0a5e917d046f38e0758a4495efff3f0107e78
Parents: f1997e0
Author: Elliott Clark <eclark@apache.org>
Authored: Wed May 4 01:54:21 2016 -0700
Committer: Elliott Clark <eclark@apache.org>
Committed: Thu May 5 10:23:03 2016 -0700

----------------------------------------------------------------------
 .../connection/client-dispatcher.cc             |  9 +--
 .../connection/client-dispatcher.h              | 11 ++-
 .../connection/client-handler.cc                | 13 ++--
 hbase-native-client/connection/client-handler.h |  4 +-
 .../connection/connection-factory.cc            | 46 ++++++------
 .../connection/connection-factory.h             | 11 ++-
 .../connection/connection-pool-test.cc          | 54 +++++++++++---
 .../connection/connection-pool.cc               | 33 +++++++--
 .../connection/connection-pool.h                | 10 ++-
 hbase-native-client/core/client.cc              |  6 +-
 hbase-native-client/core/location-cache.cc      | 25 +++----
 hbase-native-client/core/location-cache.h       |  6 +-
 hbase-native-client/core/meta-utils.cc          |  6 +-
 hbase-native-client/core/meta-utils.h           |  4 +-
 hbase-native-client/core/region-location.h      |  8 ++-
 hbase-native-client/core/simple-client.cc       | 76 +++++++++++++++++---
 .../serde/region-info-deserializer-test.cc      |  1 -
 hbase-native-client/serde/region-info.h         |  6 +-
 18 files changed, 231 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/connection/client-dispatcher.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc
index 6e2dc54..655d765 100644
--- a/hbase-native-client/connection/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -22,10 +22,11 @@ using namespace folly;
 using namespace hbase;
 using namespace wangle;
 
-ClientDispatcher::ClientDispatcher() : requests_(), current_call_id_(9) {}
+ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {}
 
 void ClientDispatcher::read(Context *ctx, Response in) {
   auto call_id = in.call_id();
+
   auto search = requests_.find(call_id);
   CHECK(search != requests_.end());
   auto p = std::move(search->second);
@@ -38,10 +39,10 @@ void ClientDispatcher::read(Context *ctx, Response in) {
 }
 
 Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) {
-  auto call_id = ++current_call_id_;
-
+  auto call_id = current_call_id_++;
   arg->set_call_id(call_id);
-  auto &p = requests_[call_id];
+  requests_.insert(call_id, Promise<Response>{});
+  auto &p = requests_.find(call_id)->second;
   auto f = p.getFuture();
   p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) {
     LOG(ERROR) << "e = " << call_id;

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/connection/client-dispatcher.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h
index 826fc6a..4435a1b 100644
--- a/hbase-native-client/connection/client-dispatcher.h
+++ b/hbase-native-client/connection/client-dispatcher.h
@@ -19,8 +19,12 @@
 
 #pragma once
 
+#include <folly/AtomicHashMap.h>
+#include <folly/Logging.h>
 #include <wangle/service/ClientDispatcher.h>
 
+#include <atomic>
+
 #include "connection/pipeline.h"
 #include "connection/request.h"
 #include "connection/response.h"
@@ -31,13 +35,16 @@ class ClientDispatcher
                                           std::unique_ptr<Request>, Response> {
 public:
   ClientDispatcher();
+  ~ClientDispatcher() {
+    LOG(ERROR) << "Killing ClientDispatcher call_id = " << current_call_id_;
+  }
   void read(Context *ctx, Response in) override;
   folly::Future<Response> operator()(std::unique_ptr<Request> arg) override;
   folly::Future<folly::Unit> close(Context *ctx) override;
   folly::Future<folly::Unit> close() override;
 
 private:
-  std::unordered_map<uint32_t, folly::Promise<Response>> requests_;
+  folly::AtomicHashMap<uint32_t, folly::Promise<Response>> requests_;
   // Start at some number way above what could
   // be there for un-initialized call id counters.
   //
@@ -46,6 +53,6 @@ private:
   //
   // uint32_t has a max of 4Billion so 10 more or less is
   // not a big deal.
-  uint32_t current_call_id_;
+  std::atomic<uint32_t> current_call_id_;
 };
 } // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/connection/client-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 496e4f2..b92ad89 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -37,7 +37,10 @@ using hbase::pb::GetResponse;
 using google::protobuf::Message;
 
 ClientHandler::ClientHandler(std::string user_name)
-    : user_name_(user_name), need_send_header_(true), serde_(), resp_msgs_() {}
+    : user_name_(user_name), need_send_header_(true), serde_(),
+      resp_msgs_(
+          make_unique<folly::AtomicHashMap<
+              uint32_t, std::shared_ptr<google::protobuf::Message>>>(5000)) {}
 
 void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
   if (LIKELY(buf != nullptr)) {
@@ -51,14 +54,14 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf)
{
               << " has_exception=" << header.has_exception();
 
     // Get the response protobuf from the map
-    auto search = resp_msgs_.find(header.call_id());
+    auto search = resp_msgs_->find(header.call_id());
     // It's an error if it's not there.
-    CHECK(search != resp_msgs_.end());
+    CHECK(search != resp_msgs_->end());
     auto resp_msg = search->second;
     CHECK(resp_msg != nullptr);
 
     // Make sure we don't leak the protobuf
-    resp_msgs_.erase(search);
+    resp_msgs_->erase(header.call_id());
 
     // set the call_id.
     // This will be used to by the dispatcher to match up
@@ -96,7 +99,7 @@ Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request>
r) {
     ctx->fireWrite(std::move(pre));
   }
 
-  resp_msgs_[r->call_id()] = r->resp_msg();
+  resp_msgs_->insert(r->call_id(), r->resp_msg());
   return ctx->fireWrite(
       serde_.Request(r->call_id(), r->method(), r->req_msg().get()));
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/connection/client-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index ce99c9e..be5143c 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -18,6 +18,7 @@
  */
 #pragma once
 
+#include <folly/AtomicHashMap.h>
 #include <wangle/channel/Handler.h>
 
 #include <string>
@@ -51,7 +52,8 @@ private:
   RpcSerde serde_;
 
   // in flight requests
-  std::unordered_map<uint32_t, std::shared_ptr<google::protobuf::Message>>
+  std::unique_ptr<folly::AtomicHashMap<
+      uint32_t, std::shared_ptr<google::protobuf::Message>>>
       resp_msgs_;
 };
 } // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/connection/connection-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index 9102d60..635d12d 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -19,40 +19,36 @@
 
 #include "connection/connection-factory.h"
 
-#include <folly/futures/Future.h>
-#include <wangle/bootstrap/ClientBootstrap.h>
-#include <wangle/channel/AsyncSocketHandler.h>
-#include <wangle/channel/EventBaseHandler.h>
-#include <wangle/channel/OutputBufferingHandler.h>
-#include <wangle/service/ClientDispatcher.h>
-#include <wangle/service/CloseOnReleaseFilter.h>
-#include <wangle/service/ExpiringFilter.h>
-
-#include <string>
+#include <wangle/concurrent/GlobalExecutor.h>
 
 #include "connection/client-dispatcher.h"
 #include "connection/pipeline.h"
-#include "connection/request.h"
-#include "connection/response.h"
 #include "connection/service.h"
 
 using namespace folly;
 using namespace hbase;
-using namespace wangle;
 
-ConnectionFactory::ConnectionFactory() : bootstrap_() {
-  bootstrap_.group(std::make_shared<wangle::IOThreadPoolExecutor>(1));
-  bootstrap_.pipelineFactory(std::make_shared<RpcPipelineFactory>());
-}
+ConnectionFactory::ConnectionFactory()
+    : io_pool_(std::static_pointer_cast<wangle::IOThreadPoolExecutor>(
+          wangle::getIOExecutor())),
+      pipeline_factory_(std::make_shared<RpcPipelineFactory>()) {}
 
-std::shared_ptr<HBaseService>
-ConnectionFactory::make_connection(const std::string &host, int port) {
-  // Connect to a given server
-  // Then when connected create a ClientDispactcher.
-  auto pipeline = bootstrap_.connect(SocketAddress(host, port, true)).get();
+std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>
+ConnectionFactory::MakeBootstrap() {
+  auto client = std::make_shared<wangle::ClientBootstrap<SerializePipeline>>();
+  client->group(io_pool_);
+  client->pipelineFactory(pipeline_factory_);
+
+  return client;
+}
+std::shared_ptr<HBaseService> ConnectionFactory::Connect(
+    std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client,
+    const std::string &hostname, int port) {
+  // Yes this will block however it makes dealing with connection pool soooooo
+  // much nicer.
+  // TODO see about using shared promise for this.
+  auto pipeline = client->connect(SocketAddress(hostname, port, true)).get();
   auto dispatcher = std::make_shared<ClientDispatcher>();
   dispatcher->setPipeline(pipeline);
-  auto service = std::make_shared<
-      CloseOnReleaseFilter<std::unique_ptr<Request>, Response>>(dispatcher);
-  return service;
+  return dispatcher;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/connection/connection-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index fc4e161..2284a7c 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -33,10 +33,15 @@ public:
   ConnectionFactory();
   virtual ~ConnectionFactory() = default;
 
-  virtual std::shared_ptr<HBaseService> make_connection(const std::string &host,
-                                                        int port);
+  virtual std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>
+  MakeBootstrap();
+
+  virtual std::shared_ptr<HBaseService>
+  Connect(std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client,
+          const std::string &hostname, int port);
 
 private:
-  wangle::ClientBootstrap<SerializePipeline> bootstrap_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool_;
+  std::shared_ptr<RpcPipelineFactory> pipeline_factory_;
 };
 } // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/connection/connection-pool-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc
index 975bc5e..b1a0ba0 100644
--- a/hbase-native-client/connection/connection-pool-test.cc
+++ b/hbase-native-client/connection/connection-pool-test.cc
@@ -24,6 +24,7 @@
 
 #include "connection/connection-factory.h"
 #include "if/HBase.pb.h"
+#include "serde/server-name.h"
 
 using namespace hbase;
 
@@ -33,11 +34,16 @@ using ::testing::_;
 
 class MockConnectionFactory : public ConnectionFactory {
 public:
-  MOCK_METHOD2(make_connection,
-               std::shared_ptr<HBaseService>(const std::string &hostname,
-                                             int port));
+  MOCK_METHOD0(MakeBootstrap,
+               std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>());
+  MOCK_METHOD3(Connect,
+               std::shared_ptr<HBaseService>(
+                   std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>,
+                   const std::string &hostname, int port));
 };
 
+class MockBootstrap : public wangle::ClientBootstrap<SerializePipeline> {};
+
 class MockServiceBase : public HBaseService {
 public:
   folly::Future<Response> operator()(std::unique_ptr<Request> req) override {
@@ -54,19 +60,20 @@ public:
 };
 
 TEST(TestConnectionPool, TestOnlyCreateOnce) {
-  std::string hostname{"hostname"};
+  auto hostname = std::string{"hostname"};
+  auto mock_boot = std::make_shared<MockBootstrap>();
   auto mock_service = std::make_shared<MockService>();
+  auto mock_cf = std::make_shared<MockConnectionFactory>();
   uint32_t port{999};
 
-  LOG(ERROR) << "About to make a MockConnectionFactory";
-  auto mock_cf = std::make_shared<MockConnectionFactory>();
-  EXPECT_CALL((*mock_cf), make_connection(_, _))
+  EXPECT_CALL((*mock_cf), Connect(_, _, _))
       .Times(1)
       .WillRepeatedly(Return(mock_service));
+  EXPECT_CALL((*mock_cf), MakeBootstrap())
+      .Times(1)
+      .WillRepeatedly(Return(mock_boot));
   ConnectionPool cp{mock_cf};
 
-  LOG(ERROR) << "Created ConnectionPool";
-
   ServerName sn;
   sn.set_host_name(hostname);
   sn.set_port(port);
@@ -75,3 +82,32 @@ TEST(TestConnectionPool, TestOnlyCreateOnce) {
   ASSERT_TRUE(result != nullptr);
   result = cp.get(sn);
 }
+
+TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) {
+  std::string hostname_one{"hostname"};
+  std::string hostname_two{"hostname_two"};
+  uint32_t port{999};
+
+  auto mock_boot = std::make_shared<MockBootstrap>();
+  auto mock_service = std::make_shared<MockService>();
+  auto mock_cf = std::make_shared<MockConnectionFactory>();
+
+  EXPECT_CALL((*mock_cf), Connect(_, _, _))
+      .Times(2)
+      .WillRepeatedly(Return(mock_service));
+  EXPECT_CALL((*mock_cf), MakeBootstrap())
+      .Times(2)
+      .WillRepeatedly(Return(mock_boot));
+  ConnectionPool cp{mock_cf};
+
+  {
+    auto result_one = cp.get(folly::to<ServerName>(
+        hostname_one + ":" + folly::to<std::string>(port)));
+    auto result_two = cp.get(folly::to<ServerName>(
+        hostname_two + ":" + folly::to<std::string>(port)));
+  }
+  auto result_one = cp.get(
+      folly::to<ServerName>(hostname_one + ":" + folly::to<std::string>(port)));
+  auto result_two = cp.get(
+      folly::to<ServerName>(hostname_two + ":" + folly::to<std::string>(port)));
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/connection/connection-pool.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index eafe60a..6ed5ad9 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -19,6 +19,7 @@
 
 #include "connection/connection-pool.h"
 
+#include <folly/SocketAddress.h>
 #include <wangle/service/Service.h>
 
 using std::mutex;
@@ -26,28 +27,46 @@ using std::unique_ptr;
 using std::shared_ptr;
 using hbase::pb::ServerName;
 using folly::SharedMutexWritePriority;
+using folly::SocketAddress;
 
 namespace hbase {
 
 ConnectionPool::ConnectionPool()
-    : cf_(std::make_shared<ConnectionFactory>()), connections_(), map_mutex_() {
-}
+    : cf_(std::make_shared<ConnectionFactory>()), clients_(), connections_(),
+      map_mutex_() {}
 ConnectionPool::ConnectionPool(std::shared_ptr<ConnectionFactory> cf)
-    : cf_(cf), connections_(), map_mutex_() {}
+    : cf_(cf), clients_(), connections_(), map_mutex_() {}
+
+ConnectionPool::~ConnectionPool() {
+  SharedMutexWritePriority::WriteHolder holder(map_mutex_);
+  for (auto &item : connections_) {
+    auto &con = item.second;
+    con->close();
+  }
+  connections_.clear();
+  clients_.clear();
+}
 
 std::shared_ptr<HBaseService> ConnectionPool::get(const ServerName &sn) {
+  // Create a read lock.
   SharedMutexWritePriority::UpgradeHolder holder(map_mutex_);
+
   auto found = connections_.find(sn);
   if (found == connections_.end() || found->second == nullptr) {
+    // Move the upgradable lock into the write lock if the connection
+    // hasn't been found.
     SharedMutexWritePriority::WriteHolder holder(std::move(holder));
-    auto new_con = cf_->make_connection(sn.host_name(), sn.port());
-    connections_[sn] = new_con;
-    return new_con;
+    auto client = cf_->MakeBootstrap();
+    auto dispatcher = cf_->Connect(client, sn.host_name(), sn.port());
+    clients_.insert(std::make_pair(sn, client));
+    connections_.insert(std::make_pair(sn, dispatcher));
+    return dispatcher;
   }
   return found->second;
 }
+
 void ConnectionPool::close(const ServerName &sn) {
-  SharedMutexWritePriority::WriteHolder holder(map_mutex_);
+  SharedMutexWritePriority::WriteHolder holder{map_mutex_};
 
   auto found = connections_.find(sn);
   if (found == connections_.end() || found->second == nullptr) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/connection/connection-pool.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
index b8330e3..907afdb 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -37,23 +37,29 @@ struct ServerNameHash {
   std::size_t operator()(hbase::pb::ServerName const &s) const {
     std::size_t h1 = std::hash<std::string>()(s.host_name());
     std::size_t h2 = std::hash<uint32_t>()(s.port());
-    return h1 ^ (h2 << 1);
+    return h1 ^ (h2 << 2);
   }
 };
 
 class ConnectionPool {
 public:
   ConnectionPool();
+  ~ConnectionPool();
   explicit ConnectionPool(std::shared_ptr<ConnectionFactory> cf);
   std::shared_ptr<HBaseService> get(const hbase::pb::ServerName &sn);
   void close(const hbase::pb::ServerName &sn);
 
 private:
-  std::shared_ptr<ConnectionFactory> cf_;
   std::unordered_map<hbase::pb::ServerName, std::shared_ptr<HBaseService>,
                      ServerNameHash, ServerNameEquals>
       connections_;
+  std::unordered_map<
+      hbase::pb::ServerName,
+      std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>,
+      ServerNameHash, ServerNameEquals>
+      clients_;
   folly::SharedMutexWritePriority map_mutex_;
+  std::shared_ptr<ConnectionFactory> cf_;
 };
 
 } // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/core/client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index 266c239..4b9f844 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -33,8 +33,4 @@ using namespace folly;
 using namespace std;
 using namespace hbase::pb;
 
-namespace hbase {
-
-Client::Client(string quorum_spec)
-    : location_cache_(quorum_spec, wangle::getCPUExecutor()) {}
-}
+namespace hbase {}

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/core/location-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index 2667f11..e2a6251 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -25,8 +25,8 @@
 #include "connection/response.h"
 #include "if/Client.pb.h"
 #include "if/ZooKeeper.pb.h"
-#include "serde/server-name.h"
 #include "serde/region-info.h"
+#include "serde/server-name.h"
 #include "serde/zk.h"
 
 using namespace std;
@@ -109,17 +109,22 @@ ServerName LocationCache::ReadMetaLocation() {
 
 Future<std::shared_ptr<RegionLocation>>
 LocationCache::LocateFromMeta(const TableName &tn, const string &row) {
-  auto exc = wangle::getIOExecutor();
+  auto exec = wangle::getCPUExecutor();
   return this->LocateMeta()
-      .then([&](ServerName sn) { return this->cp_.get(sn); })
-      .via(exc.get()) // Need to handle all rpc's on the IOExecutor.
+      .via(exec.get())
+      .then([ exec = exec, this ](ServerName sn) { return this->cp_.get(sn); })
       .then([&](std::shared_ptr<HBaseService> service) {
         return (*service)(std::move(meta_util_.MetaRequest(tn, row)));
       })
-      .then([&](Response resp) {
+      .then([this](Response resp) {
         // take the protobuf response and make it into
         // a region location.
         return this->CreateLocation(std::move(resp));
+      })
+      .then([ exec = exec, this ](std::shared_ptr<RegionLocation> rl) {
+        // Now fill out the connection.
+        rl->set_service(cp_.get(rl->server_name()));
+        return rl;
       });
 }
 
@@ -162,16 +167,12 @@ private:
 };
 
 std::shared_ptr<RegionLocation>
-LocationCache::CreateLocation(const Response &resp){
+LocationCache::CreateLocation(const Response &resp) {
   auto resp_msg = static_pointer_cast<ScanResponse>(resp.response());
   auto &results = resp_msg->results().Get(0);
   auto &cells = results.cell();
-  LOG(ERROR) << "resp_msg = " << resp_msg->DebugString();
   auto ri = folly::to<RegionInfo>(cells.Get(0).value());
   auto sn = folly::to<ServerName>(cells.Get(1).value());
-
-  LOG(ERROR) << "RegionInfo = " << ri.DebugString();
-  LOG(ERROR) << "ServerName = " << sn.DebugString();
-  auto wrapped = make_shared<RemoveServiceFilter>(cp_.get(sn), sn, this->cp_);
-  return std::make_shared<RegionLocation>(std::move(ri), std::move(sn), wrapped);
+  return std::make_shared<RegionLocation>(cells.Get(0).row(), std::move(ri), sn,
+                                          nullptr);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/core/location-cache.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h
index 99b5e5e..7f76428 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -48,9 +48,10 @@ public:
   // Meta Related Methods.
   // These are only public until testing is complete
   folly::Future<hbase::pb::ServerName> LocateMeta();
-  folly::Future<std::shared_ptr<RegionLocation>> LocateFromMeta(const hbase::pb::TableName
&tn,
-                                               const std::string &row);
+  folly::Future<std::shared_ptr<RegionLocation>>
+  LocateFromMeta(const hbase::pb::TableName &tn, const std::string &row);
   void InvalidateMeta();
+  ConnectionPool cp_;
 
 private:
   void RefreshMetaLocation();
@@ -61,7 +62,6 @@ private:
   std::shared_ptr<folly::Executor> executor_;
   std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> meta_promise_;
   std::mutex meta_lock_;
-  ConnectionPool cp_;
   MetaUtil meta_util_;
 
   // TODO: migrate this to a smart pointer with a deleter.

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/core/meta-utils.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc
index 1325d83..23d2041 100644
--- a/hbase-native-client/core/meta-utils.cc
+++ b/hbase-native-client/core/meta-utils.cc
@@ -37,12 +37,12 @@ using hbase::pb::RegionSpecifier_RegionSpecifierType;
 static const std::string META_REGION = "1588230740";
 
 std::string MetaUtil::RegionLookupRowkey(const TableName &tn,
-                                           const std::string &row) const {
+                                         const std::string &row) const {
   return folly::to<std::string>(tn, ",", row, ",", "999999999999999999");
 }
 
-std::unique_ptr<Request>
-MetaUtil::MetaRequest(const TableName tn, const std::string &row) const {
+std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn,
+                                               const std::string &row) const {
   auto request = Request::scan();
   auto msg = std::static_pointer_cast<ScanRequest>(request->req_msg());
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/core/meta-utils.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h
index 5a659f3..dfef065 100644
--- a/hbase-native-client/core/meta-utils.h
+++ b/hbase-native-client/core/meta-utils.h
@@ -29,8 +29,8 @@ namespace hbase {
 class MetaUtil {
 public:
   std::string RegionLookupRowkey(const hbase::pb::TableName &tn,
-                                   const std::string &row) const;
+                                 const std::string &row) const;
   std::unique_ptr<Request> MetaRequest(const hbase::pb::TableName tn,
-                                             const std::string &row) const;
+                                       const std::string &row) const;
 };
 } // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/core/region-location.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index 7922c95..7887526 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -27,15 +27,19 @@ namespace hbase {
 
 class RegionLocation {
 public:
-  RegionLocation(hbase::pb::RegionInfo ri, hbase::pb::ServerName sn,
+  RegionLocation(std::string region_name, hbase::pb::RegionInfo ri,
+                 hbase::pb::ServerName sn,
                  std::shared_ptr<HBaseService> service)
-      : ri_(ri), sn_(sn), service_(service) {}
+      : region_name_(region_name), ri_(ri), sn_(sn), service_(service) {}
 
   const hbase::pb::RegionInfo &region_info() { return ri_; }
   const hbase::pb::ServerName &server_name() { return sn_; }
+  const std::string &region_name() { return region_name_; }
   std::shared_ptr<HBaseService> service() { return service_; }
+  void set_service(std::shared_ptr<HBaseService> s) { service_ = s; }
 
 private:
+  std::string region_name_;
   hbase::pb::RegionInfo ri_;
   hbase::pb::ServerName sn_;
   std::shared_ptr<HBaseService> service_;

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 00e3369..39c82c3 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -19,16 +19,21 @@
 
 #include <folly/Logging.h>
 #include <folly/Random.h>
+#include <folly/futures/Future.h>
 #include <gflags/gflags.h>
+#include <wangle/concurrent/CPUThreadPoolExecutor.h>
 #include <wangle/concurrent/GlobalExecutor.h>
 
+#include <atomic>
 #include <chrono>
 #include <iostream>
+#include <thread>
 
 #include "connection/connection-pool.h"
 #include "core/client.h"
 #include "if/Client.pb.h"
 #include "if/ZooKeeper.pb.h"
+#include "serde/server-name.h"
 #include "serde/table-name.h"
 
 using namespace folly;
@@ -39,16 +44,41 @@ using hbase::Request;
 using hbase::HBaseService;
 using hbase::LocationCache;
 using hbase::ConnectionPool;
+using hbase::ConnectionFactory;
 using hbase::pb::TableName;
 using hbase::pb::ServerName;
 using hbase::pb::RegionSpecifier_RegionSpecifierType;
-using hbase::pb::GetRequest;
-using hbase::pb::GetResponse;
+using hbase::pb::MutateRequest;
+using hbase::pb::MutationProto_MutationType;
 
 // TODO(eclark): remove the need for this.
 DEFINE_string(table, "t", "What region to send a get");
 DEFINE_string(row, "test", "What row to get");
 DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to");
+DEFINE_uint64(columns, 10000, "How many columns to write");
+DEFINE_int32(threads, 6, "How many cpu threads");
+
+std::unique_ptr<Request> MakeRequest(uint64_t col, std::string region_name) {
+  auto req = Request::mutate();
+  auto msg = std::static_pointer_cast<MutateRequest>(req->req_msg());
+  auto region = msg->mutable_region();
+  auto suf = folly::to<std::string>(col);
+
+  region->set_value(region_name);
+  region->set_type(RegionSpecifier_RegionSpecifierType::
+                       RegionSpecifier_RegionSpecifierType_REGION_NAME);
+  auto mutation = msg->mutable_mutation();
+  mutation->set_row(FLAGS_row + suf);
+  mutation->set_mutate_type(
+      MutationProto_MutationType::MutationProto_MutationType_PUT);
+  auto column = mutation->add_column_value();
+  column->set_family("d");
+  auto qual = column->add_qualifier_value();
+  qual->set_qualifier(suf);
+  qual->set_value(".");
+
+  return std::move(req);
+}
 
 int main(int argc, char *argv[]) {
   google::SetUsageMessage(
@@ -56,13 +86,41 @@ int main(int argc, char *argv[]) {
   google::ParseCommandLineFlags(&argc, &argv, true);
   google::InitGoogleLogging(argv[0]);
 
-  // Create a connection factory
-  ConnectionPool cp;
-  auto cpu_ex = wangle::getCPUExecutor();
-  LocationCache cache{FLAGS_zookeeper, cpu_ex};
-  auto result =
-      cache.LocateFromMeta(folly::to<TableName>(FLAGS_table), FLAGS_row)
-          .get(milliseconds(5000));
+  // Set up thread pools.
+  auto cpu_pool =
+      std::make_shared<wangle::CPUThreadPoolExecutor>(FLAGS_threads);
+  wangle::setCPUExecutor(cpu_pool);
+  auto io_pool = std::make_shared<wangle::IOThreadPoolExecutor>(5);
+  wangle::setIOExecutor(io_pool);
+
+  // Create the cache.
+  LocationCache cache{FLAGS_zookeeper, cpu_pool};
+
+  auto row = FLAGS_row;
+  auto tn = folly::to<TableName>(FLAGS_table);
+
+  auto loc = cache.LocateFromMeta(tn, row).get(milliseconds(5000));
+  auto connection = loc->service();
+
+  auto num_puts = FLAGS_columns;
+
+  auto results = std::vector<Future<Response>>{};
+  uint64_t col{0};
+  for (; col < num_puts; col++) {
+    results.push_back(folly::makeFuture(col)
+                          .via(cpu_pool.get())
+                          .then([loc](uint64_t col) {
+                            return MakeRequest(col, loc->region_name());
+                          })
+                          .then([connection](std::unique_ptr<Request> req) {
+                            return (*connection)(std::move(req));
+                          }));
+  }
+  auto allf = folly::collect(results).get();
+
+  LOG(ERROR) << "Successfully sent  " << allf.size() << " requests.";
+
+  io_pool->stop();
 
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/serde/region-info-deserializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/region-info-deserializer-test.cc b/hbase-native-client/serde/region-info-deserializer-test.cc
index ce8dedf..5cb8482 100644
--- a/hbase-native-client/serde/region-info-deserializer-test.cc
+++ b/hbase-native-client/serde/region-info-deserializer-test.cc
@@ -44,7 +44,6 @@ TEST(TestRegionInfoDesializer, TestDeserialize) {
   ri_out.set_start_key(start_row);
   ri_out.set_end_key(stop_row);
 
-
   string header{"PBUF"};
   string ser = header + ri_out.SerializeAsString();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/baa0a5e9/hbase-native-client/serde/region-info.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/region-info.h b/hbase-native-client/serde/region-info.h
index 6af351c..e2ecfc9 100644
--- a/hbase-native-client/serde/region-info.h
+++ b/hbase-native-client/serde/region-info.h
@@ -21,16 +21,16 @@
 
 #include "if/HBase.pb.h"
 
-#include <folly/Conv.h>
 #include <boost/algorithm/string/predicate.hpp>
+#include <folly/Conv.h>
 
 namespace hbase {
 namespace pb {
-template <class String> void parseTo(String in, RegionInfo& out) {
+template <class String> void parseTo(String in, RegionInfo &out) {
   // TODO(eclark): there has to be something better.
   std::string s = folly::to<std::string>(in);
 
-  if (!boost::starts_with(s, "PBUF") ) {
+  if (!boost::starts_with(s, "PBUF")) {
     throw std::runtime_error("Region Info field doesn't contain preamble");
   }
   if (!out.ParseFromArray(s.data() + 4, s.size() - 4)) {


Mime
View raw message