Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BACAA200ACA for ; Thu, 19 May 2016 00:49:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B9862160A27; Wed, 18 May 2016 22:49:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 97FB0160A2C for ; Thu, 19 May 2016 00:49:28 +0200 (CEST) Received: (qmail 3718 invoked by uid 500); 18 May 2016 22:49:24 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 2699 invoked by uid 99); 18 May 2016 22:49:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 May 2016 22:49:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8A5B3E0844; Wed, 18 May 2016 22:49:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: eclark@apache.org To: commits@hbase.apache.org Date: Wed, 18 May 2016 22:49:59 -0000 Message-Id: <05950ca43af947debaf5b5e8efc79e12@git.apache.org> In-Reply-To: <5a847047da0a4c87828fc6fa66936f0d@git.apache.org> References: <5a847047da0a4c87828fc6fa66936f0d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [38/42] hbase git commit: HBASE-15770 Stop using wangle's global executor archived-at: Wed, 18 May 2016 22:49:29 -0000 HBASE-15770 Stop using wangle's global executor Summary: Connection pool and connection factory now get thread pools through their constructor. This means that the client has the whole control over the threads. Test Plan: simple-client still writes. Differential Revision: https://reviews.facebook.net/D57801 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fe16ac8d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fe16ac8d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fe16ac8d Branch: refs/heads/HBASE-14850 Commit: fe16ac8d6a5b91c10a299683a8d82e359f608248 Parents: 35b32b0 Author: Elliott Clark Authored: Fri May 6 14:32:16 2016 -0700 Committer: Elliott Clark Committed: Wed May 18 15:48:52 2016 -0700 ---------------------------------------------------------------------- .../connection/client-dispatcher.h | 3 ++- .../connection/client-handler.cc | 2 +- .../connection/connection-factory.cc | 8 +++--- .../connection/connection-factory.h | 2 +- .../connection/connection-pool-test.cc | 1 + .../connection/connection-pool.cc | 7 ++--- .../connection/connection-pool.h | 2 +- hbase-native-client/core/client.cc | 23 +++++++++++----- hbase-native-client/core/client.h | 9 +++++-- hbase-native-client/core/location-cache-test.cc | 5 ++-- hbase-native-client/core/location-cache.cc | 28 +++++++++++--------- hbase-native-client/core/location-cache.h | 8 ++++-- hbase-native-client/core/simple-client.cc | 8 +++--- hbase-native-client/serde/server-name-test.cc | 1 - 14 files changed, 63 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/connection/client-dispatcher.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h index 4bfb35d..2497cc7 100644 --- a/hbase-native-client/connection/client-dispatcher.h +++ b/hbase-native-client/connection/client-dispatcher.h @@ -31,7 +31,8 @@ namespace hbase { /** - * Dispatcher that assigns a call_id and then routes the response back to the future. + * Dispatcher that assigns a call_id and then routes the response back to the + * future. */ class ClientDispatcher : public wangle::ClientDispatcherBase()), resp_msgs_( make_unique>>(5000)) {} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/connection/connection-factory.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index 635d12d..beec6d5 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -19,8 +19,6 @@ #include "connection/connection-factory.h" -#include - #include "connection/client-dispatcher.h" #include "connection/pipeline.h" #include "connection/service.h" @@ -28,9 +26,9 @@ using namespace folly; using namespace hbase; -ConnectionFactory::ConnectionFactory() - : io_pool_(std::static_pointer_cast( - wangle::getIOExecutor())), +ConnectionFactory::ConnectionFactory( + std::shared_ptr io_pool) + : io_pool_(io_pool), pipeline_factory_(std::make_shared()) {} std::shared_ptr> http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/connection/connection-factory.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h index 8b6d8d8..fb5d9fe 100644 --- a/hbase-native-client/connection/connection-factory.h +++ b/hbase-native-client/connection/connection-factory.h @@ -39,7 +39,7 @@ public: * Constructor. * There should only be one ConnectionFactory per client. */ - ConnectionFactory(); + ConnectionFactory(std::shared_ptr io_pool); /** Default Desctructor */ virtual ~ConnectionFactory() = default; http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/connection/connection-pool-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc index b1a0ba0..c0c346f 100644 --- a/hbase-native-client/connection/connection-pool-test.cc +++ b/hbase-native-client/connection/connection-pool-test.cc @@ -34,6 +34,7 @@ using ::testing::_; class MockConnectionFactory : public ConnectionFactory { public: + MockConnectionFactory() : ConnectionFactory(nullptr) {} MOCK_METHOD0(MakeBootstrap, std::shared_ptr>()); MOCK_METHOD3(Connect, http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/connection/connection-pool.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index 75f343e..90e2056 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -31,9 +31,10 @@ using hbase::HBaseService; using folly::SharedMutexWritePriority; using folly::SocketAddress; -ConnectionPool::ConnectionPool() - : cf_(std::make_shared()), clients_(), connections_(), - map_mutex_() {} +ConnectionPool::ConnectionPool( + std::shared_ptr io_executor) + : cf_(std::make_shared(io_executor)), clients_(), + connections_(), map_mutex_() {} ConnectionPool::ConnectionPool(std::shared_ptr cf) : cf_(cf), clients_(), connections_(), map_mutex_() {} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/connection/connection-pool.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index 5edd407..60f00de 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -61,7 +61,7 @@ struct ServerNameHash { class ConnectionPool { public: /** Create connection pool wit default connection factory */ - ConnectionPool(); + ConnectionPool(std::shared_ptr io_executor); /** * Desctructor. http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/core/client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc index 4b9f844..1e80998 100644 --- a/hbase-native-client/core/client.cc +++ b/hbase-native-client/core/client.cc @@ -19,18 +19,27 @@ #include "core/client.h" -#include -#include -#include #include -#include #include -#include "if/ZooKeeper.pb.h" - using namespace folly; using namespace std; using namespace hbase::pb; -namespace hbase {} +namespace hbase { + +Client::Client(std::string zk_quorum) + : cpu_executor_(std::make_shared(4)), + io_executor_(std::make_shared( + sysconf(_SC_NPROCESSORS_ONLN))), + location_cache_(zk_quorum, cpu_executor_, io_executor_) {} + +// We can't have the threads continue running after everything is done +// that leads to an error. +Client::~Client() { + cpu_executor_->stop(); + io_executor_->stop(); +} + +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/core/client.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h index 4a6d23b..4db82c4 100644 --- a/hbase-native-client/core/client.h +++ b/hbase-native-client/core/client.h @@ -21,6 +21,8 @@ #include #include +#include +#include #include @@ -33,19 +35,22 @@ namespace hbase { * Client. * * This is the class that provides access to an HBase cluster. - * It is thread safe and does connection pooling. Current recommendations are to have only one Client per cluster around. + * It is thread safe and does connection pooling. Current recommendations are to + * have only one Client per cluster around. */ class Client { public: - /** * Create a new client. * @param quorum_spec Where to connect to get Zookeeper bootstrap information. */ explicit Client(std::string quorum_spec); + ~Client(); private: LocationCache location_cache_; + std::shared_ptr cpu_executor_; + std::shared_ptr io_executor_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/core/location-cache-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc index 172799d..8bc6383 100644 --- a/hbase-native-client/core/location-cache-test.cc +++ b/hbase-native-client/core/location-cache-test.cc @@ -18,14 +18,15 @@ */ #include #include -#include #include "location-cache.h" using namespace hbase; TEST(LocationCacheTest, TestGetMetaNodeContents) { // TODO(elliott): need to make a test utility for this. - LocationCache cache{"localhost:2181", wangle::getCPUExecutor()}; + auto cpu = std::make_shared(4); + auto io = std::make_shared(4); + LocationCache cache{"localhost:2181", cpu, io}; auto f = cache.LocateMeta(); auto result = f.get(); ASSERT_FALSE(f.hasException()); http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/core/location-cache.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index 583d305..6c018f9 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -20,7 +20,8 @@ #include #include -#include +#include +#include #include "connection/response.h" #include "if/Client.pb.h" @@ -48,10 +49,13 @@ using hbase::pb::RegionInfo; // TODO(eclark): make this configurable on client creation static const char META_ZNODE_NAME[] = "/hbase/meta-region-server"; -LocationCache::LocationCache(string quorum_spec, - shared_ptr executor) - : quorum_spec_(quorum_spec), executor_(executor), meta_promise_(nullptr), - meta_lock_(), cp_(), meta_util_(), zk_(nullptr) { +LocationCache::LocationCache( + std::string quorum_spec, + std::shared_ptr cpu_executor, + std::shared_ptr io_executor) + : quorum_spec_(quorum_spec), cpu_executor_(cpu_executor), + meta_promise_(nullptr), meta_lock_(), cp_(io_executor), meta_util_(), + zk_(nullptr) { zk_ = zookeeper_init(quorum_spec.c_str(), nullptr, 1000, 0, 0, 0); } @@ -79,7 +83,7 @@ void LocationCache::InvalidateMeta() { /// MUST hold the meta_lock_ void LocationCache::RefreshMetaLocation() { meta_promise_ = make_unique>(); - executor_->add([&] { + cpu_executor_->add([&] { meta_promise_->setWith([&] { return this->ReadMetaLocation(); }); }); } @@ -109,10 +113,9 @@ ServerName LocationCache::ReadMetaLocation() { Future> LocationCache::LocateFromMeta(const TableName &tn, const string &row) { - auto exec = wangle::getCPUExecutor(); return this->LocateMeta() - .via(exec.get()) - .then([ exec = exec, this ](ServerName sn) { return this->cp_.get(sn); }) + .via(cpu_executor_.get()) + .then([this](ServerName sn) { return this->cp_.get(sn); }) .then([&](std::shared_ptr service) { return (*service)(std::move(meta_util_.MetaRequest(tn, row))); }) @@ -121,7 +124,7 @@ LocationCache::LocateFromMeta(const TableName &tn, const string &row) { // a region location. return this->CreateLocation(std::move(resp)); }) - .then([ exec = exec, this ](std::shared_ptr rl) { + .then([this](std::shared_ptr rl) { // Now fill out the connection. rl->set_service(cp_.get(rl->server_name())); return rl; @@ -129,14 +132,14 @@ LocationCache::LocateFromMeta(const TableName &tn, const string &row) { } /** - * Filter to remove a service from the location cache and the connection cache on errors + * Filter to remove a service from the location cache and the connection cache + * on errors * or on cloase. */ class RemoveServiceFilter : public ServiceFilter, Response> { public: - /** Create a new filter. */ RemoveServiceFilter(std::shared_ptr service, ServerName sn, ConnectionPool &cp) @@ -157,7 +160,6 @@ public: } } - /** Has this been closed */ virtual bool isAvailable() override { return !released && service_->isAvailable(); http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/core/location-cache.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h index 88bec18..e077750 100644 --- a/hbase-native-client/core/location-cache.h +++ b/hbase-native-client/core/location-cache.h @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include #include @@ -51,7 +53,8 @@ public: * @param executor The cpu executor to run on. */ LocationCache(std::string quorum_spec, - std::shared_ptr executor); + std::shared_ptr cpu_exector, + std::shared_ptr io_executor); /** * Destructor. * This will clean up the zookeeper connections. @@ -81,8 +84,9 @@ private: hbase::pb::ServerName ReadMetaLocation(); std::shared_ptr CreateLocation(const Response &resp); + /* data */ std::string quorum_spec_; - std::shared_ptr executor_; + std::shared_ptr cpu_executor_; std::unique_ptr> meta_promise_; std::mutex meta_lock_; MetaUtil meta_util_; http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/core/simple-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index 39c82c3..f3f6c42 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -22,7 +22,7 @@ #include #include #include -#include +#include #include #include @@ -89,12 +89,10 @@ int main(int argc, char *argv[]) { // Set up thread pools. auto cpu_pool = std::make_shared(FLAGS_threads); - wangle::setCPUExecutor(cpu_pool); auto io_pool = std::make_shared(5); - wangle::setIOExecutor(io_pool); // Create the cache. - LocationCache cache{FLAGS_zookeeper, cpu_pool}; + LocationCache cache{FLAGS_zookeeper, cpu_pool, io_pool}; auto row = FLAGS_row; auto tn = folly::to(FLAGS_table); @@ -105,7 +103,7 @@ int main(int argc, char *argv[]) { auto num_puts = FLAGS_columns; auto results = std::vector>{}; - uint64_t col{0}; + auto col = uint64_t{0}; for (; col < num_puts; col++) { results.push_back(folly::makeFuture(col) .via(cpu_pool.get()) http://git-wip-us.apache.org/repos/asf/hbase/blob/fe16ac8d/hbase-native-client/serde/server-name-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/server-name-test.cc b/hbase-native-client/serde/server-name-test.cc index 2281fa2..73a68d6 100644 --- a/hbase-native-client/serde/server-name-test.cc +++ b/hbase-native-client/serde/server-name-test.cc @@ -46,5 +46,4 @@ TEST(TestServerName, TestIPV6) { ASSERT_EQ("[::::1]", sn.host_name()); ASSERT_EQ(123, sn.port()); - }