hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject hbase git commit: HBASE-15731 Add on a connection pool
Date Fri, 29 Apr 2016 22:46:44 GMT
Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 fdf98c0a3 -> 2e3a0c9ca


HBASE-15731 Add on a connection pool

Summary:
Add on a connection pool protected by read write mutex.
Add on a service filter that will remove a connection from a connection pool when closed

Test Plan: Need to add on tests.

Differential Revision: https://reviews.facebook.net/D57411


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2e3a0c9c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2e3a0c9c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2e3a0c9c

Branch: refs/heads/HBASE-14850
Commit: 2e3a0c9cac442e26e06c6f57ac1e2a35a3bdd2af
Parents: fdf98c0
Author: Elliott Clark <eclark@apache.org>
Authored: Thu Apr 28 19:53:42 2016 -0700
Committer: Elliott Clark <eclark@apache.org>
Committed: Fri Apr 29 15:41:00 2016 -0700

----------------------------------------------------------------------
 hbase-native-client/connection/BUCK             |   5 +
 .../connection/connection-factory.cc            |   4 +-
 .../connection/connection-factory.h             |   5 +-
 .../connection/connection-pool-test.cc          |  77 ++++++++++++++
 .../connection/connection-pool.cc               |  89 ++++++++++++++++
 .../connection/connection-pool.h                |  59 +++++++++++
 hbase-native-client/connection/service.h        |   4 +-
 hbase-native-client/core/BUCK                   |   4 -
 hbase-native-client/core/client.h               |   3 -
 hbase-native-client/core/get-request.cc         |  19 ----
 hbase-native-client/core/get-request.h          |  35 -------
 hbase-native-client/core/get-result.cc          |  19 ----
 hbase-native-client/core/get-result.h           |  32 ------
 hbase-native-client/core/location-cache.cc      |   1 -
 hbase-native-client/core/simple-client.cc       | 105 +++++++++++--------
 15 files changed, 297 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/connection/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index d393885..96f2136 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -22,6 +22,7 @@ cxx_library(name="connection",
                 "client-dispatcher.h",
                 "client-handler.h",
                 "connection-factory.h",
+                "connection-pool.h",
                 "pipeline.h",
                 "request.h",
                 "response.h",
@@ -31,6 +32,7 @@ cxx_library(name="connection",
                 "client-dispatcher.cc",
                 "client-handler.cc",
                 "connection-factory.cc",
+                "connection-pool.cc",
                 "pipeline.cc",
                 "request.cc",
             ],
@@ -42,3 +44,6 @@ cxx_library(name="connection",
                 "//third-party:wangle",
             ],
             visibility=['//core/...', ], )
+cxx_test(name="connection-pool-test",
+         srcs=["connection-pool-test.cc", ],
+         deps=[":connection", ], )

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/connection/connection-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index 7073f9d..b546269 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -45,8 +45,8 @@ ConnectionFactory::ConnectionFactory() {
   bootstrap_.pipelineFactory(std::make_shared<RpcPipelineFactory>());
 }
 
-std::shared_ptr<Service<std::unique_ptr<Request>, Response>>
-ConnectionFactory::make_connection(std::string host, int port) {
+std::shared_ptr<HBaseService>
+ConnectionFactory::make_connection(const std::string &host, int port) {
   // Connect to a given server
   // Then when connected create a ClientDispactcher.
   auto pipeline = bootstrap_.connect(SocketAddress(host, port, true)).get();

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/connection/connection-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index 8d1d2f0..5a45316 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -31,8 +31,9 @@ namespace hbase {
 class ConnectionFactory {
 public:
   ConnectionFactory();
-  std::shared_ptr<wangle::Service<std::unique_ptr<Request>, Response>>
-  make_connection(std::string host, int port);
+
+  virtual std::shared_ptr<HBaseService> make_connection(const std::string &host,
+                                                        int port);
 
 private:
   wangle::ClientBootstrap<SerializePipeline> bootstrap_;

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/connection/connection-pool-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc
new file mode 100644
index 0000000..975bc5e
--- /dev/null
+++ b/hbase-native-client/connection/connection-pool-test.cc
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/connection-pool.h"
+
+#include <folly/Logging.h>
+#include <gmock/gmock.h>
+
+#include "connection/connection-factory.h"
+#include "if/HBase.pb.h"
+
+using namespace hbase;
+
+using hbase::pb::ServerName;
+using ::testing::Return;
+using ::testing::_;
+
+class MockConnectionFactory : public ConnectionFactory {
+public:
+  MOCK_METHOD2(make_connection,
+               std::shared_ptr<HBaseService>(const std::string &hostname,
+                                             int port));
+};
+
+class MockServiceBase : public HBaseService {
+public:
+  folly::Future<Response> operator()(std::unique_ptr<Request> req) override {
+    return do_operation(req.get());
+  }
+  virtual folly::Future<Response> do_operation(Request *req) {
+    return folly::makeFuture<Response>(Response{});
+  }
+};
+
+class MockService : public MockServiceBase {
+public:
+  MOCK_METHOD1(do_operation, folly::Future<Response>(Request *));
+};
+
+TEST(TestConnectionPool, TestOnlyCreateOnce) {
+  std::string hostname{"hostname"};
+  auto mock_service = std::make_shared<MockService>();
+  uint32_t port{999};
+
+  LOG(ERROR) << "About to make a MockConnectionFactory";
+  auto mock_cf = std::make_shared<MockConnectionFactory>();
+  EXPECT_CALL((*mock_cf), make_connection(_, _))
+      .Times(1)
+      .WillRepeatedly(Return(mock_service));
+  ConnectionPool cp{mock_cf};
+
+  LOG(ERROR) << "Created ConnectionPool";
+
+  ServerName sn;
+  sn.set_host_name(hostname);
+  sn.set_port(port);
+
+  auto result = cp.get(sn);
+  ASSERT_TRUE(result != nullptr);
+  result = cp.get(sn);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/connection/connection-pool.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
new file mode 100644
index 0000000..a967df2
--- /dev/null
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/connection-pool.h"
+
+#include <wangle/service/Service.h>
+
+using std::mutex;
+using std::unique_ptr;
+using std::shared_ptr;
+using hbase::pb::ServerName;
+using wangle::ServiceFilter;
+using folly::SharedMutexWritePriority;
+
+namespace hbase {
+
+class RemoveServiceFilter
+    : public ServiceFilter<unique_ptr<Request>, Response> {
+
+public:
+  RemoveServiceFilter(std::shared_ptr<HBaseService> service, ServerName sn,
+                      ConnectionPool *cp)
+      : ServiceFilter<unique_ptr<Request>, Response>(service), sn_(sn),
+        cp_(cp) {}
+
+  folly::Future<folly::Unit> close() override {
+    if (!released.exchange(true)) {
+      return this->service_->close().then(
+          [this]() { this->cp_->close(this->sn_); });
+    } else {
+      return folly::makeFuture();
+    }
+  }
+
+  virtual bool isAvailable() override { return service_->isAvailable(); }
+
+  folly::Future<Response> operator()(unique_ptr<Request> req) override {
+    return (*this->service_)(std::move(req));
+  }
+
+private:
+  std::atomic<bool> released{false};
+  hbase::pb::ServerName sn_;
+  ConnectionPool *cp_;
+};
+
+ConnectionPool::ConnectionPool() : cf_(std::make_shared<ConnectionFactory>()) {}
+ConnectionPool::ConnectionPool(std::shared_ptr<ConnectionFactory> cf)
+    : cf_(cf) {}
+
+std::shared_ptr<HBaseService> ConnectionPool::get(const ServerName &sn) {
+  SharedMutexWritePriority::UpgradeHolder holder(map_mutex_);
+  auto found = connections_.find(sn);
+  if (found == connections_.end() || found->second == nullptr) {
+    SharedMutexWritePriority::WriteHolder holder(std::move(holder));
+    auto new_con = cf_->make_connection(sn.host_name(), sn.port());
+    auto wrapped = std::make_shared<RemoveServiceFilter>(new_con, sn, this);
+    connections_[sn] = wrapped;
+    return new_con;
+  }
+  return found->second;
+}
+void ConnectionPool::close(ServerName sn) {
+  SharedMutexWritePriority::WriteHolder holder(map_mutex_);
+
+  auto found = connections_.find(sn);
+  if (found == connections_.end() || found->second == nullptr) {
+    return;
+  }
+  auto service = found->second;
+  connections_.erase(found);
+}
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/connection/connection-pool.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
new file mode 100644
index 0000000..394cd71
--- /dev/null
+++ b/hbase-native-client/connection/connection-pool.h
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/SharedMutex.h>
+#include <mutex>
+#include <unordered_map>
+
+#include "connection/connection-factory.h"
+#include "connection/service.h"
+#include "if/HBase.pb.h"
+
+namespace hbase {
+struct MyServerNameEquals {
+  bool operator()(const hbase::pb::ServerName &lhs,
+                  const hbase::pb::ServerName &rhs) const {
+    return lhs.host_name() == rhs.host_name() && lhs.port() == rhs.port();
+  }
+};
+struct MyServerNameHash {
+  std::size_t operator()(hbase::pb::ServerName const &s) const {
+    std::size_t h1 = std::hash<std::string>()(s.host_name());
+    std::size_t h2 = std::hash<uint32_t>()(s.port());
+    return h1 ^ (h2 << 1);
+  }
+};
+
+class ConnectionPool {
+public:
+  ConnectionPool();
+  explicit ConnectionPool(std::shared_ptr<ConnectionFactory> cf);
+  std::shared_ptr<HBaseService> get(const hbase::pb::ServerName &sn);
+  void close(hbase::pb::ServerName sn);
+
+private:
+  std::shared_ptr<ConnectionFactory> cf_;
+  std::unordered_map<hbase::pb::ServerName, std::shared_ptr<HBaseService>,
+                     MyServerNameHash, MyServerNameEquals>
+      connections_;
+  folly::SharedMutexWritePriority map_mutex_;
+};
+
+} // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/connection/service.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/service.h b/hbase-native-client/connection/service.h
index feb14ec..79f087d 100644
--- a/hbase-native-client/connection/service.h
+++ b/hbase-native-client/connection/service.h
@@ -18,9 +18,11 @@
  */
 #pragma once
 
+#include <wangle/service/Service.h>
+
 #include "connection/request.h"
 #include "connection/response.h"
 
 namespace hbase {
-using HBaseService = wangle::Service<Request, Response>;
+using HBaseService = wangle::Service<std::unique_ptr<Request>, Response>;
 } // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 9db6fda..e555ba4 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -21,8 +21,6 @@ cxx_library(
     exported_headers=[
         "client.h",
         "connection.h",
-        "get-request.h",
-        "get-result.h",
         "hbase_macros.h",
         "location-cache.h",
         "table-name.h",
@@ -32,8 +30,6 @@ cxx_library(
     ],
     srcs=[
         "client.cc",
-        "get-request.cc",
-        "get-result.cc",
         "location-cache.cc",
         "meta-utils.cc",
         "table-name.cc",

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/core/client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index b583285..4bed751 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -24,8 +24,6 @@
 
 #include <string>
 
-#include "core/get-request.h"
-#include "core/get-result.h"
 #include "core/location-cache.h"
 #include "if/Cell.pb.h"
 
@@ -33,7 +31,6 @@ namespace hbase {
 class Client {
 public:
   explicit Client(std::string quorum_spec);
-  folly::Future<GetResult> get(const GetRequest &get_request);
 
 private:
   LocationCache location_cache_;

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/core/get-request.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/get-request.cc b/hbase-native-client/core/get-request.cc
deleted file mode 100644
index e927ccc..0000000
--- a/hbase-native-client/core/get-request.cc
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "core/get-request.h"

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/core/get-request.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/get-request.h b/hbase-native-client/core/get-request.h
deleted file mode 100644
index bb755c5..0000000
--- a/hbase-native-client/core/get-request.h
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <string>
-
-#include "core/table-name.h"
-
-namespace hbase {
-
-class GetRequest {
-public:
-  GetRequest(TableName table_name, std::string key);
-
-private:
-  TableName table_name_;
-  std::string key_;
-};
-} // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/core/get-result.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/get-result.cc b/hbase-native-client/core/get-result.cc
deleted file mode 100644
index 7eea483..0000000
--- a/hbase-native-client/core/get-result.cc
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "core/get-result.h"

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/core/get-result.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/get-result.h b/hbase-native-client/core/get-result.h
deleted file mode 100644
index a49ad98..0000000
--- a/hbase-native-client/core/get-result.h
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <string>
-
-namespace hbase {
-
-class GetResult {
-public:
-  explicit GetResult(std::string key);
-
-private:
-  std::string key_;
-};
-} // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/core/location-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index 5925f4a..c81deba 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -77,7 +77,6 @@ ServerName LocationCache::ReadMetaLocation() {
   int zk_result =
       zoo_get(this->zk_, META_ZNODE_NAME, 0,
               reinterpret_cast<char *>(buf->writableData()), &len, nullptr);
-  LOG(ERROR) << "len = " << len;
   if (zk_result != ZOK || len < 9) {
     LOG(ERROR) << "Error getting meta location.";
     throw runtime_error("Error getting meta location");

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e3a0c9c/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 2cb6200..11dcd68 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -25,7 +25,7 @@
 #include <chrono>
 #include <iostream>
 
-#include "connection/connection-factory.h"
+#include "connection/connection-pool.h"
 #include "core/client.h"
 #include "if/Client.pb.h"
 #include "if/ZooKeeper.pb.h"
@@ -33,13 +33,20 @@
 using namespace folly;
 using namespace std;
 using namespace std::chrono;
-using namespace hbase;
-using namespace hbase::pb;
-using namespace google::protobuf;
+using hbase::Response;
+using hbase::Request;
+using hbase::HBaseService;
+using hbase::LocationCache;
+using hbase::ConnectionPool;
+using hbase::pb::ServerName;
+using hbase::pb::RegionSpecifier_RegionSpecifierType;
+using hbase::pb::GetRequest;
+using hbase::pb::GetResponse;
 
 // TODO(eclark): remove the need for this.
 DEFINE_string(region, "1588230740", "What region to send a get to");
 DEFINE_string(row, "test", "What row to get");
+DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to");
 
 int main(int argc, char *argv[]) {
   google::SetUsageMessage(
@@ -48,46 +55,52 @@ int main(int argc, char *argv[]) {
   google::InitGoogleLogging(argv[0]);
 
   // Create a connection factory
-  ConnectionFactory cf;
-
-  LocationCache cache{"localhost:2181", wangle::getCPUExecutor()};
-
-  auto result = cache.LocateMeta().get();
-
-  // Create a connection to the local host
-  auto conn = cf.make_connection(result.host_name(), result.port());
-
-  // Send the request
-  auto r = Request::get();
-
-  // This is a get request so make that
-  auto req_msg = static_pointer_cast<hbase::pb::GetRequest>(r->req_msg());
-
-  // Set what region
-  req_msg->mutable_region()->set_value(FLAGS_region);
-  // It's always this.
-  req_msg->mutable_region()->set_type(
-      RegionSpecifier_RegionSpecifierType::
-          RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
-
-  // What row.
-  req_msg->mutable_get()->set_row(FLAGS_row);
-
-  // Send it.
-  auto resp = (*conn)(std::move(r)).get(milliseconds(5000));
-
-  auto get_resp = std::static_pointer_cast<GetResponse>(resp.response());
-  cout << "GetResponse has_result = " << get_resp->has_result() << '\n';
-  if (get_resp->has_result()) {
-    auto &r = get_resp->result();
-    cout << "Result cell_size = " << r.cell_size() << endl;
-    for (auto &cell : r.cell()) {
-      cout << "\trow = " << cell.row() << " family = " << cell.family()
-           << " qualifier = " << cell.qualifier()
-           << " timestamp = " << cell.timestamp() << " value = " <<
cell.value()
-           << endl;
-    }
-  }
-
-  return 0;
+  ConnectionPool cp;
+  auto cpu_ex = wangle::getCPUExecutor();
+  LocationCache cache{FLAGS_zookeeper, cpu_ex};
+  auto result =
+      cache.LocateMeta()
+          .then([&cp = cp](ServerName sn) { return cp.get(sn); })
+          .then([](shared_ptr<HBaseService> con) {
+            // Send the request
+            auto r = Request::get();
+            // This is a get request so make that
+            auto req_msg = static_pointer_cast<GetRequest>(r->req_msg());
+            // Set what region
+            req_msg->mutable_region()->set_value(FLAGS_region);
+            // It's always this.
+            req_msg->mutable_region()->set_type(
+                RegionSpecifier_RegionSpecifierType::
+                    RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
+
+            // What row.
+            req_msg->mutable_get()->set_row(FLAGS_row);
+
+            return (*con)(std::move(r));
+          })
+          .then([](Response resp) {
+            return static_pointer_cast<GetResponse>(resp.response());
+          })
+          .via(cpu_ex.get())
+          .then([](shared_ptr<GetResponse> get_resp) {
+            cout << "GetResponse has_result = " << get_resp->has_result()
+                 << '\n';
+            if (get_resp->has_result()) {
+              auto &r = get_resp->result();
+              cout << "Result cell_size = " << r.cell_size() << endl;
+              for (auto &cell : r.cell()) {
+                cout << "\trow = " << cell.row()
+                     << " family = " << cell.family()
+                     << " qualifier = " << cell.qualifier()
+                     << " timestamp = " << cell.timestamp()
+                     << " value = " << cell.value() << endl;
+              }
+              return 0;
+            }
+
+            return 1;
+          })
+          .get(milliseconds(5000));
+
+  return result;
 }


Mime
View raw message