hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject hbase git commit: HBASE-17754 [C++] RawAsyncTable
Date Sat, 18 Mar 2017 02:23:19 GMT
Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 d2bc6b1b7 -> a772ba326


HBASE-17754 [C++] RawAsyncTable


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a772ba32
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a772ba32
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a772ba32

Branch: refs/heads/HBASE-14850
Commit: a772ba326503ca98e10cfb19450860b2da33d37c
Parents: d2bc6b1
Author: Enis Soztutar <enis@apache.org>
Authored: Fri Mar 17 19:23:09 2017 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Fri Mar 17 19:23:09 2017 -0700

----------------------------------------------------------------------
 hbase-native-client/core/BUCK                   |   9 +-
 hbase-native-client/core/async-connection.cc    |  64 ++++++++
 hbase-native-client/core/async-connection.h     | 111 +++++++++++++
 hbase-native-client/core/async-region-locator.h |  62 +++++++
 .../core/async-rpc-retrying-caller-factory.h    |  61 ++++---
 .../core/async-rpc-retrying-caller.h            |  49 +++---
 .../core/async-rpc-retrying-test.cc             | 160 +++++++++----------
 hbase-native-client/core/client-test.cc         |   2 +
 hbase-native-client/core/client.cc              |  51 ++----
 hbase-native-client/core/client.h               |  38 ++---
 .../core/connection-configuration.h             |  16 +-
 hbase-native-client/core/location-cache-test.cc |   1 +
 hbase-native-client/core/location-cache.cc      |  10 +-
 hbase-native-client/core/location-cache.h       |  15 +-
 hbase-native-client/core/meta-utils.cc          |   2 +-
 hbase-native-client/core/raw-async-table.cc     |  80 ++++++++++
 hbase-native-client/core/raw-async-table.h      |  77 +++++++++
 hbase-native-client/core/response-converter.cc  |  20 ++-
 hbase-native-client/core/response-converter.h   |   6 +-
 hbase-native-client/core/table.cc               |  39 ++---
 hbase-native-client/core/table.h                |  22 +--
 hbase-native-client/security/user.h             |   1 +
 hbase-native-client/utils/time-util.h           |  27 +++-
 23 files changed, 667 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 3df48d6..2d77f2d 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -19,6 +19,8 @@
 cxx_library(
     name="core",
     exported_headers=[
+        "async-connection.h",
+        "async-region-locator.h",
         "client.h",
         "cell.h",
         "hbase-macros.h",
@@ -38,12 +40,14 @@ cxx_library(
         "request-converter.h",
         "response-converter.h",
         "table.h",
+        "raw-async-table.h",
         "async-rpc-retrying-caller-factory.h",
         "async-rpc-retrying-caller.h",
         "hbase-rpc-controller.h",
         "zk-util.h",
     ],
     srcs=[
+        "async-connection.cc",
         "cell.cc",
         "client.cc",
         "keyvalue-codec.cc",
@@ -52,6 +56,7 @@ cxx_library(
         "get.cc",
         "time-range.cc",
         "scan.cc",
+        "raw-async-table.cc",
         "result.cc",
         "request-converter.cc",
         "response-converter.cc",
@@ -70,9 +75,7 @@ cxx_library(
         "//third-party:zookeeper_mt",
     ],
     compiler_flags=['-Weffc++', '-ggdb'],
-    visibility=[
-        'PUBLIC',
-    ],)
+    visibility=['PUBLIC',],)
 cxx_library(
     name="conf",
     exported_headers=[

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/async-connection.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-connection.cc b/hbase-native-client/core/async-connection.cc
new file mode 100644
index 0000000..b945e38
--- /dev/null
+++ b/hbase-native-client/core/async-connection.cc
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-connection.h"
+#include "core/async-rpc-retrying-caller-factory.h"
+
+namespace hbase {
+
+void AsyncConnectionImpl::Init() {
+  connection_conf_ = std::make_shared<hbase::ConnectionConfiguration>(*conf_);
+  // start thread pools
+  auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN));
+  auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN));
+  cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(cpu_threads);
+  io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(io_threads);
+
+  std::shared_ptr<Codec> codec = nullptr;
+  if (conf_->Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
+      std::string(KeyValueCodec::kJavaClassName)) {
+    codec = std::make_shared<hbase::KeyValueCodec>();
+  } else {
+    LOG(WARNING) << "Not using RPC Cell Codec";
+  }
+  rpc_client_ =
+      std::make_shared<hbase::RpcClient>(io_executor_, codec, connection_conf_->connect_timeout());
+  location_cache_ =
+      std::make_shared<hbase::LocationCache>(conf_, cpu_executor_, rpc_client_->connection_pool());
+  caller_factory_ = std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this());
+}
+
+// We can't have the threads continue running after everything is done
+// that leads to an error.
+AsyncConnectionImpl::~AsyncConnectionImpl() {
+  cpu_executor_->stop();
+  io_executor_->stop();
+  if (rpc_client_.get()) rpc_client_->Close();
+}
+
+void AsyncConnectionImpl::Close() {
+  if (is_closed_) return;
+
+  cpu_executor_->stop();
+  io_executor_->stop();
+  if (rpc_client_.get()) rpc_client_->Close();
+  is_closed_ = true;
+}
+
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/async-connection.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-connection.h b/hbase-native-client/core/async-connection.h
new file mode 100644
index 0000000..6a61124
--- /dev/null
+++ b/hbase-native-client/core/async-connection.h
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <folly/futures/Future.h>
+#include <folly/io/IOBuf.h>
+#include <wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
+
+#include <memory>
+#include <string>
+
+#include "connection/rpc-client.h"
+#include "core/async-region-locator.h"
+#include "core/configuration.h"
+#include "core/connection-configuration.h"
+#include "core/connection-configuration.h"
+#include "core/hbase-configuration-loader.h"
+#include "core/hbase-rpc-controller.h"
+#include "core/keyvalue-codec.h"
+#include "core/location-cache.h"
+#include "if/Cell.pb.h"
+#include "serde/table-name.h"
+
+namespace hbase {
+
+class AsyncRpcRetryingCallerFactory;
+
+class AsyncConnection {
+ public:
+  AsyncConnection(){};
+  virtual ~AsyncConnection(){};
+  virtual std::shared_ptr<Configuration> conf() = 0;
+  virtual std::shared_ptr<ConnectionConfiguration> connection_conf() = 0;
+  virtual std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() = 0;
+  virtual std::shared_ptr<RpcClient> rpc_client() = 0;
+  virtual std::shared_ptr<AsyncRegionLocator> region_locator() = 0;
+  virtual std::shared_ptr<HBaseRpcController> CreateRpcController() = 0;
+  virtual void Close() = 0;
+};
+
+class AsyncConnectionImpl : public AsyncConnection,
+                            public std::enable_shared_from_this<AsyncConnectionImpl> {
+ public:
+  virtual ~AsyncConnectionImpl();
+
+  // See https://mortoray.com/2013/08/02/safely-using-enable_shared_from_this/
+  template <typename... T>
+  static std::shared_ptr<AsyncConnectionImpl> Create(T&&... all) {
+    auto conn =
+        std::shared_ptr<AsyncConnectionImpl>(new AsyncConnectionImpl(std::forward<T>(all)...));
+    conn->Init();
+    return conn;
+  }
+
+  std::shared_ptr<Configuration> conf() override { return conf_; }
+  std::shared_ptr<ConnectionConfiguration> connection_conf() override { return connection_conf_; }
+  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
+    return caller_factory_;
+  }
+  std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
+  std::shared_ptr<LocationCache> location_cache() { return location_cache_; }
+  std::shared_ptr<AsyncRegionLocator> region_locator() override { return location_cache_; }
+  std::shared_ptr<HBaseRpcController> CreateRpcController() override {
+    return std::make_shared<HBaseRpcController>();
+  }
+
+  virtual void Close() override;
+
+ protected:
+  AsyncConnectionImpl() {}
+
+ private:
+  /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */
+  static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size";
+  /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */
+  static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size";
+  /** The RPC codec to encode cells. For now it is KeyValueCodec */
+  static constexpr const char* kRpcCodec = "hbase.client.rpc.codec";
+
+  std::shared_ptr<Configuration> conf_;
+  std::shared_ptr<ConnectionConfiguration> connection_conf_;
+  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
+  std::shared_ptr<LocationCache> location_cache_;
+  std::shared_ptr<RpcClient> rpc_client_;
+  bool is_closed_ = false;
+
+ private:
+  AsyncConnectionImpl(std::shared_ptr<Configuration> conf) : conf_(conf) {}
+  void Init();
+};
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/async-region-locator.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-region-locator.h b/hbase-native-client/core/async-region-locator.h
new file mode 100644
index 0000000..b0019e0
--- /dev/null
+++ b/hbase-native-client/core/async-region-locator.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <folly/futures/Future.h>
+#include <string>
+
+#include "core/region-location.h"
+#include "if/Client.pb.h"
+#include "serde/region-info.h"
+#include "serde/server-name.h"
+#include "serde/table-name.h"
+
+namespace hbase {
+
+class AsyncRegionLocator {
+ public:
+  AsyncRegionLocator() {}
+  virtual ~AsyncRegionLocator() = default;
+
+  /**
+   * The only method clients should use for meta lookups. If corresponding
+   * location is cached, it's returned from the cache, otherwise lookup
+   * in meta table is done, location is cached and then returned.
+   * It's expected that tiny fraction of invocations incurs meta scan.
+   * This method is to look up non-meta regions; use LocateMeta() to get the
+   * location of hbase:meta region.
+   *
+   * @param tn Table name of the table to look up. This object must live until
+   * after the future is returned
+   *
+   * @param row of the table to look up. This object must live until after the
+   * future is returned
+   */
+  virtual folly::Future<std::shared_ptr<RegionLocation>> LocateRegion(
+      const hbase::pb::TableName &tn, const std::string &row,
+      const RegionLocateType locate_type = RegionLocateType::kCurrent,
+      const int64_t locate_ns = 0) = 0;
+  /**
+   * Update cached region location, possibly using the information from exception.
+   */
+  virtual void UpdateCachedLocation(const RegionLocation &loc, const std::exception &error) = 0;
+};
+
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/async-rpc-retrying-caller-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
index 3342e29..5bcad6c 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
@@ -35,20 +35,25 @@ using std::chrono::nanoseconds;
 
 namespace hbase {
 
-template <typename CONN, typename RESP, typename RPC_CLIENT>
+class AsyncConnection;
+
+template <typename RESP>
 class SingleRequestCallerBuilder
-    : public std::enable_shared_from_this<SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT>> {
+    : public std::enable_shared_from_this<SingleRequestCallerBuilder<RESP>> {
  public:
-  explicit SingleRequestCallerBuilder(std::shared_ptr<CONN> conn)
+  explicit SingleRequestCallerBuilder(std::shared_ptr<AsyncConnection> conn)
       : conn_(conn),
         table_name_(nullptr),
-        rpc_timeout_nanos_(0),
-        operation_timeout_nanos_(0),
+        rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()),
+        pause_(conn->connection_conf()->pause()),
+        operation_timeout_nanos_(conn->connection_conf()->operation_timeout()),
+        max_retries_(conn->connection_conf()->max_retries()),
+        start_log_errors_count_(conn->connection_conf()->start_log_errors_count()),
         locate_type_(RegionLocateType::kCurrent) {}
 
   virtual ~SingleRequestCallerBuilder() = default;
 
-  typedef SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT> GenenericThisType;
+  typedef SingleRequestCallerBuilder<RESP> GenenericThisType;
   typedef std::shared_ptr<GenenericThisType> SharedThisPtr;
 
   SharedThisPtr table(std::shared_ptr<TableName> table_name) {
@@ -66,6 +71,21 @@ class SingleRequestCallerBuilder
     return shared_this();
   }
 
+  SharedThisPtr pause(nanoseconds pause) {
+    pause_ = pause;
+    return shared_this();
+  }
+
+  SharedThisPtr max_retries(uint32_t max_retries) {
+    max_retries_ = max_retries;
+    return shared_this();
+  }
+
+  SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) {
+    start_log_errors_count_ = start_log_errors_count;
+    return shared_this();
+  }
+
   SharedThisPtr row(const std::string& row) {
     row_ = row;
     return shared_this();
@@ -76,18 +96,17 @@ class SingleRequestCallerBuilder
     return shared_this();
   }
 
-  SharedThisPtr action(Callable<RESP, RPC_CLIENT> callable) {
+  SharedThisPtr action(Callable<RESP> callable) {
     callable_ = callable;
     return shared_this();
   }
 
   folly::Future<RESP> Call() { return Build()->Call(); }
 
-  std::shared_ptr<AsyncSingleRequestRpcRetryingCaller<CONN, RESP, RPC_CLIENT>> Build() {
-    return std::make_shared<AsyncSingleRequestRpcRetryingCaller<CONN, RESP, RPC_CLIENT>>(
-        conn_, table_name_, row_, locate_type_, callable_, conn_->get_conn_conf()->GetPauseNs(),
-        conn_->get_conn_conf()->GetMaxRetries(), operation_timeout_nanos_, rpc_timeout_nanos_,
-        conn_->get_conn_conf()->GetStartLogErrorsCount());
+  std::shared_ptr<AsyncSingleRequestRpcRetryingCaller<RESP>> Build() {
+    return std::make_shared<AsyncSingleRequestRpcRetryingCaller<RESP>>(
+        conn_, table_name_, row_, locate_type_, callable_, pause_, max_retries_,
+        operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_);
   }
 
  private:
@@ -96,28 +115,30 @@ class SingleRequestCallerBuilder
   }
 
  private:
-  std::shared_ptr<CONN> conn_;
+  std::shared_ptr<AsyncConnection> conn_;
   std::shared_ptr<TableName> table_name_;
   nanoseconds rpc_timeout_nanos_;
   nanoseconds operation_timeout_nanos_;
+  nanoseconds pause_;
+  uint32_t max_retries_;
+  uint32_t start_log_errors_count_;
   std::string row_;
   RegionLocateType locate_type_;
-  Callable<RESP, RPC_CLIENT> callable_;
+  Callable<RESP> callable_;
 };  // end of SingleRequestCallerBuilder
 
-template <typename CONN>
 class AsyncRpcRetryingCallerFactory {
  private:
-  std::shared_ptr<CONN> conn_;
+  std::shared_ptr<AsyncConnection> conn_;
 
  public:
-  explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<CONN> conn) : conn_(conn) {}
+  explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<AsyncConnection> conn) : conn_(conn) {}
 
   virtual ~AsyncRpcRetryingCallerFactory() = default;
 
-  template <typename RESP, typename RPC_CLIENT = hbase::RpcClient>
-  std::shared_ptr<SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT>> Single() {
-    return std::make_shared<SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT>>(conn_);
+  template <typename RESP>
+  std::shared_ptr<SingleRequestCallerBuilder<RESP>> Single() {
+    return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_);
   }
 };
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/async-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h
index f7a1523..6503301 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller.h
@@ -32,6 +32,7 @@
 #include <utility>
 #include <vector>
 #include "connection/rpc-client.h"
+#include "core/async-connection.h"
 #include "core/hbase-rpc-controller.h"
 #include "core/region-location.h"
 #include "exceptions/exception.h"
@@ -60,23 +61,23 @@ using RespConverter = std::function<R(const S&)>;
 template <typename RESP>
 using RpcCallback = std::function<void(const RESP&)>;
 
-template <typename REQ, typename RESP, typename RPC_CLIENT = hbase::RpcClient>
+template <typename REQ, typename RESP>
 using RpcCall = std::function<folly::Future<std::unique_ptr<RESP>>(
-    std::shared_ptr<RPC_CLIENT>, std::shared_ptr<RegionLocation>,
+    std::shared_ptr<RpcClient>, std::shared_ptr<RegionLocation>,
     std::shared_ptr<HBaseRpcController>, std::unique_ptr<REQ>)>;
 
-template <typename RESP, typename RPC_CLIENT = hbase::RpcClient>
-using Callable = std::function<folly::Future<RESP>(std::shared_ptr<HBaseRpcController>,
-                                                   std::shared_ptr<RegionLocation>,
-                                                   std::shared_ptr<RPC_CLIENT>)>;
+template <typename RESP>
+using Callable =
+    std::function<folly::Future<RESP>(std::shared_ptr<HBaseRpcController>,
+                                      std::shared_ptr<RegionLocation>, std::shared_ptr<RpcClient>)>;
 
-template <typename CONN, typename RESP, typename RPC_CLIENT = hbase::RpcClient>
+template <typename RESP>
 class AsyncSingleRequestRpcRetryingCaller {
  public:
-  AsyncSingleRequestRpcRetryingCaller(std::shared_ptr<CONN> conn,
+  AsyncSingleRequestRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
                                       std::shared_ptr<hbase::pb::TableName> table_name,
                                       const std::string& row, RegionLocateType locate_type,
-                                      Callable<RESP, RPC_CLIENT> callable, nanoseconds pause_ns,
+                                      Callable<RESP> callable, nanoseconds pause,
                                       uint32_t max_retries, nanoseconds operation_timeout_nanos,
                                       nanoseconds rpc_timeout_nanos,
                                       uint32_t start_log_errors_count)
@@ -85,14 +86,14 @@ class AsyncSingleRequestRpcRetryingCaller {
         row_(row),
         locate_type_(locate_type),
         callable_(callable),
-        pause_ns_(pause_ns),
+        pause_(pause),
         max_retries_(max_retries),
         operation_timeout_nanos_(operation_timeout_nanos),
         rpc_timeout_nanos_(rpc_timeout_nanos),
         start_log_errors_count_(start_log_errors_count),
         promise_(std::make_shared<folly::Promise<RESP>>()),
         tries_(1) {
-    controller_ = conn_->get_rpc_controller_factory()->NewController();
+    controller_ = conn_->CreateRpcController();
     start_ns_ = TimeUtil::GetNowNanos();
     max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
     exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
@@ -120,9 +121,9 @@ class AsyncSingleRequestRpcRetryingCaller {
       locate_timeout_ns = -1L;
     }
 
-    conn_->get_locator()
-        ->GetRegionLocation(table_name_, row_, locate_type_, locate_timeout_ns)
-        .then([this](RegionLocation& loc) { Call(loc); })
+    conn_->region_locator()
+        ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns)
+        .then([this](std::shared_ptr<RegionLocation> loc) { Call(*loc); })
         .onError([this](const std::exception& e) {
           OnError(e,
                   [this]() -> std::string {
@@ -155,10 +156,9 @@ class AsyncSingleRequestRpcRetryingCaller {
         CompleteExceptionally();
         return;
       }
-      delay_ns =
-          std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1));
+      delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
     } else {
-      delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1);
+      delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
     }
     update_cached_location(error);
     tries_++;
@@ -179,9 +179,10 @@ class AsyncSingleRequestRpcRetryingCaller {
       call_timeout_ns = rpc_timeout_nanos_.count();
     }
 
-    std::shared_ptr<RPC_CLIENT> rpc_client;
+    std::shared_ptr<RpcClient> rpc_client;
     try {
-      rpc_client = conn_->GetRpcClient();
+      // TODO: There is no connection attempt happening here, no need to try-catch.
+      rpc_client = conn_->rpc_client();
     } catch (const IOException& e) {
       OnError(e,
               [&, this]() -> std::string {
@@ -196,7 +197,7 @@ class AsyncSingleRequestRpcRetryingCaller {
                        " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
               },
               [&, this](const std::exception& error) {
-                conn_->get_locator()->UpdateCachedLocation(loc, error);
+                conn_->region_locator()->UpdateCachedLocation(loc, error);
               });
       return;
     }
@@ -219,7 +220,7 @@ class AsyncSingleRequestRpcRetryingCaller {
                            " ms";
                   },
                   [&, this](const std::exception& error) {
-                    conn_->get_locator()->UpdateCachedLocation(loc, error);
+                    conn_->region_locator()->UpdateCachedLocation(loc, error);
                   });
           return;
         });
@@ -244,12 +245,12 @@ class AsyncSingleRequestRpcRetryingCaller {
 
  private:
   folly::HHWheelTimer::UniquePtr retry_timer_;
-  std::shared_ptr<CONN> conn_;
+  std::shared_ptr<AsyncConnection> conn_;
   std::shared_ptr<hbase::pb::TableName> table_name_;
   std::string row_;
   RegionLocateType locate_type_;
-  Callable<RESP, RPC_CLIENT> callable_;
-  nanoseconds pause_ns_;
+  Callable<RESP> callable_;
+  nanoseconds pause_;
   uint32_t max_retries_;
   nanoseconds operation_timeout_nanos_;
   nanoseconds rpc_timeout_nanos_;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/async-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
index 81d726f..d0c7921 100644
--- a/hbase-native-client/core/async-rpc-retrying-test.cc
+++ b/hbase-native-client/core/async-rpc-retrying-test.cc
@@ -30,9 +30,11 @@
 #include "connection/request.h"
 #include "connection/response.h"
 #include "connection/rpc-client.h"
+#include "core/async-connection.h"
 #include "core/async-rpc-retrying-caller-factory.h"
 #include "core/async-rpc-retrying-caller.h"
 #include "core/client.h"
+#include "core/connection-configuration.h"
 #include "core/hbase-rpc-controller.h"
 #include "core/keyvalue-codec.h"
 #include "core/region-location.h"
@@ -43,6 +45,7 @@
 #include "if/Client.pb.h"
 #include "if/HBase.pb.h"
 #include "test-util/test-util.h"
+#include "utils/time-util.h"
 
 using namespace google::protobuf;
 using namespace hbase;
@@ -53,61 +56,73 @@ using ::testing::Return;
 using ::testing::_;
 using std::chrono::nanoseconds;
 
-class MockRpcControllerFactory {
+class MockAsyncRegionLocator : public AsyncRegionLocator {
  public:
-  MOCK_METHOD0(NewController, std::shared_ptr<HBaseRpcController>());
-};
-
-class MockAsyncConnectionConfiguration {
- public:
-  MOCK_METHOD0(GetPauseNs, nanoseconds());
-  MOCK_METHOD0(GetMaxRetries, int32_t());
-  MOCK_METHOD0(GetStartLogErrorsCount, int32_t());
-  MOCK_METHOD0(GetReadRpcTimeoutNs, nanoseconds());
-  MOCK_METHOD0(GetOperationTimeoutNs, nanoseconds());
-};
-
-class AsyncRegionLocator {
- public:
-  explicit AsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+  explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
       : region_location_(region_location) {}
-  ~AsyncRegionLocator() = default;
-
-  folly::Future<RegionLocation> GetRegionLocation(std::shared_ptr<hbase::pb::TableName>,
-                                                  const std::string&, RegionLocateType, int64_t) {
-    folly::Promise<RegionLocation> promise;
-    promise.setValue(*region_location_);
+  ~MockAsyncRegionLocator() = default;
+
+  folly::Future<std::shared_ptr<RegionLocation>> LocateRegion(const hbase::pb::TableName&,
+                                                              const std::string&,
+                                                              const RegionLocateType,
+                                                              const int64_t) override {
+    folly::Promise<std::shared_ptr<RegionLocation>> promise;
+    promise.setValue(region_location_);
     return promise.getFuture();
   }
 
-  void UpdateCachedLocation(const RegionLocation&, const std::exception&) {}
+  void UpdateCachedLocation(const RegionLocation&, const std::exception&) override {}
 
  private:
   std::shared_ptr<RegionLocation> region_location_;
 };
 
-class MockAsyncConnection {
+class MockAsyncConnection : public AsyncConnection,
+                            public std::enable_shared_from_this<MockAsyncConnection> {
  public:
-  MOCK_METHOD0(get_conn_conf, std::shared_ptr<MockAsyncConnectionConfiguration>());
-  MOCK_METHOD0(get_rpc_controller_factory, std::shared_ptr<MockRpcControllerFactory>());
-  MOCK_METHOD0(get_locator, std::shared_ptr<AsyncRegionLocator>());
-  MOCK_METHOD0(GetRpcClient, std::shared_ptr<hbase::RpcClient>());
+  MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
+                      std::shared_ptr<RpcClient> rpc_client,
+                      std::shared_ptr<AsyncRegionLocator> region_locator)
+      : conn_conf_(conn_conf), rpc_client_(rpc_client), region_locator_(region_locator) {}
+  ~MockAsyncConnection() {}
+  void Init() {
+    caller_factory_ = std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this());
+  }
+
+  std::shared_ptr<Configuration> conf() override { return nullptr; }
+  std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
+  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
+    return caller_factory_;
+  }
+  std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
+  std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
+
+  void Close() override {}
+  std::shared_ptr<HBaseRpcController> CreateRpcController() override {
+    return std::make_shared<HBaseRpcController>();
+  }
+
+ private:
+  std::shared_ptr<ConnectionConfiguration> conn_conf_;
+  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
+  std::shared_ptr<RpcClient> rpc_client_;
+  std::shared_ptr<AsyncRegionLocator> region_locator_;
 };
 
 template <typename CONN>
 class MockRawAsyncTableImpl {
  public:
   explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn)
-      : conn_(conn), promise_(std::make_shared<folly::Promise<hbase::Result>>()) {}
+      : conn_(conn), promise_(std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>()) {}
   virtual ~MockRawAsyncTableImpl() = default;
 
   /* implement this in real RawAsyncTableImpl. */
 
   /* in real RawAsyncTableImpl, this should be private. */
-  folly::Future<hbase::Result> GetCall(std::shared_ptr<hbase::RpcClient> rpc_client,
-                                       std::shared_ptr<HBaseRpcController> controller,
-                                       std::shared_ptr<RegionLocation> loc, const hbase::Get& get) {
-    hbase::RpcCall<hbase::Request, hbase::Response, hbase::RpcClient> rpc_call = [](
+  folly::Future<std::shared_ptr<hbase::Result>> GetCall(
+      std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
+      std::shared_ptr<RegionLocation> loc, const hbase::Get& get) {
+    hbase::RpcCall<hbase::Request, hbase::Response> rpc_call = [](
         std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc,
         std::shared_ptr<HBaseRpcController> controller,
         std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> {
@@ -115,7 +130,7 @@ class MockRawAsyncTableImpl {
                                    std::move(preq), User::defaultUser(), "ClientService");
     };
 
-    return Call<hbase::Get, hbase::Request, hbase::Response, hbase::Result>(
+    return Call<hbase::Get, hbase::Request, hbase::Response, std::shared_ptr<hbase::Result>>(
         rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call,
         &hbase::ResponseConverter::FromGetResponse);
   }
@@ -126,12 +141,12 @@ class MockRawAsyncTableImpl {
       std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
       std::shared_ptr<RegionLocation> loc, const REQ& req,
       const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string>& req_converter,
-      const hbase::RpcCall<PREQ, PRESP, hbase::RpcClient>& rpc_call,
-      const RespConverter<std::unique_ptr<RESP>, PRESP>& resp_converter) {
+      const hbase::RpcCall<PREQ, PRESP>& rpc_call,
+      const RespConverter<RESP, PRESP>& resp_converter) {
     rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name())))
         .then([&, this](std::unique_ptr<PRESP> presp) {
-          std::unique_ptr<hbase::Result> result = hbase::ResponseConverter::FromGetResponse(*presp);
-          promise_->setValue(std::move(*result));
+          std::shared_ptr<hbase::Result> result = hbase::ResponseConverter::FromGetResponse(*presp);
+          promise_->setValue(result);
         })
         .onError([this](const std::exception& e) { promise_->setException(e); });
     return promise_->getFuture();
@@ -139,7 +154,7 @@ class MockRawAsyncTableImpl {
 
  private:
   std::shared_ptr<CONN> conn_;
-  std::shared_ptr<folly::Promise<hbase::Result>> promise_;
+  std::shared_ptr<folly::Promise<std::shared_ptr<hbase::Result>>> promise_;
 };
 
 TEST(AsyncRpcRetryTest, TestGetBasic) {
@@ -172,69 +187,50 @@ TEST(AsyncRpcRetryTest, TestGetBasic) {
   auto codec = std::make_shared<hbase::KeyValueCodec>();
   auto rpc_client = std::make_shared<RpcClient>(io_executor_, codec);
 
-  /* init rpc controller */
-  auto controller = std::make_shared<HBaseRpcController>();
-
-  /* init rpc controller factory */
-  auto controller_factory = std::make_shared<MockRpcControllerFactory>();
-  EXPECT_CALL((*controller_factory), NewController()).Times(1).WillRepeatedly(Return(controller));
-
   /* init connection configuration */
-  auto connection_conf = std::make_shared<MockAsyncConnectionConfiguration>();
-  EXPECT_CALL((*connection_conf), GetPauseNs())
-      .Times(1)
-      .WillRepeatedly(Return(nanoseconds(100000000)));
-  EXPECT_CALL((*connection_conf), GetMaxRetries()).Times(1).WillRepeatedly(Return(31));
-  EXPECT_CALL((*connection_conf), GetStartLogErrorsCount()).Times(1).WillRepeatedly(Return(9));
-  EXPECT_CALL((*connection_conf), GetReadRpcTimeoutNs())
-      .Times(1)
-      .WillRepeatedly(Return(nanoseconds(60000000000)));
-  EXPECT_CALL((*connection_conf), GetOperationTimeoutNs())
-      .Times(1)
-      .WillRepeatedly(Return(nanoseconds(1200000000000)));
+  auto connection_conf = std::make_shared<ConnectionConfiguration>(
+      TimeUtil::SecondsToNanos(20),    // connect_timeout
+      TimeUtil::SecondsToNanos(1200),  // operation_timeout
+      TimeUtil::SecondsToNanos(60),    // rpc_timeout
+      TimeUtil::MillisToNanos(100),    // pause
+      31,                              // max retries
+      9);                              // start log errors count
 
   /* init region locator */
-  auto region_locator = std::make_shared<AsyncRegionLocator>(region_location);
+  auto region_locator = std::make_shared<MockAsyncRegionLocator>(region_location);
 
   /* init hbase client connection */
-  auto conn = std::make_shared<MockAsyncConnection>();
-  EXPECT_CALL((*conn), get_conn_conf()).Times(AtLeast(1)).WillRepeatedly(Return(connection_conf));
-  EXPECT_CALL((*conn), get_rpc_controller_factory())
-      .Times(AtLeast(1))
-      .WillRepeatedly(Return(controller_factory));
-  EXPECT_CALL((*conn), get_locator()).Times(AtLeast(1)).WillRepeatedly(Return(region_locator));
-  EXPECT_CALL((*conn), GetRpcClient()).Times(AtLeast(1)).WillRepeatedly(Return(rpc_client));
+  auto conn = std::make_shared<MockAsyncConnection>(connection_conf, rpc_client, region_locator);
+  conn->Init();
 
   /* init retry caller factory */
   auto tableImpl = std::make_shared<MockRawAsyncTableImpl<MockAsyncConnection>>(conn);
-  AsyncRpcRetryingCallerFactory<MockAsyncConnection> caller_factory(conn);
 
   /* init request caller builder */
-  auto builder = caller_factory.Single<hbase::Result>();
+  auto builder = conn->caller_factory()->Single<std::shared_ptr<hbase::Result>>();
 
   /* call with retry to get result */
   try {
     auto async_caller =
         builder->table(std::make_shared<TableName>(tn))
             ->row(row)
-            ->rpc_timeout(conn->get_conn_conf()->GetReadRpcTimeoutNs())
-            ->operation_timeout(conn->get_conn_conf()->GetOperationTimeoutNs())
-            ->action(
-                [=, &get](
-                    std::shared_ptr<hbase::HBaseRpcController> controller,
-                    std::shared_ptr<hbase::RegionLocation> loc,
-                    std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<hbase::Result> {
-                  return tableImpl->GetCall(rpc_client, controller, loc, get);
-                })
+            ->rpc_timeout(conn->connection_conf()->read_rpc_timeout())
+            ->operation_timeout(conn->connection_conf()->operation_timeout())
+            ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller,
+                               std::shared_ptr<hbase::RegionLocation> loc,
+                               std::shared_ptr<hbase::RpcClient> rpc_client)
+                         -> folly::Future<std::shared_ptr<hbase::Result>> {
+                           return tableImpl->GetCall(rpc_client, controller, loc, get);
+                         })
             ->Build();
 
-    hbase::Result result = async_caller->Call().get();
+    auto result = async_caller->Call().get();
 
     // Test the values, should be same as in put executed on hbase shell
-    ASSERT_TRUE(!result.IsEmpty()) << "Result shouldn't be empty.";
-    EXPECT_EQ("test2", result.Row());
-    EXPECT_EQ("value2", *(result.Value("d", "2")));
-    EXPECT_EQ("value for extra", *(result.Value("d", "extra")));
+    ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
+    EXPECT_EQ("test2", result->Row());
+    EXPECT_EQ("value2", *(result->Value("d", "2")));
+    EXPECT_EQ("value for extra", *(result->Value("d", "extra")));
   } catch (std::exception& e) {
     LOG(ERROR) << e.what();
     throw e;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/client-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index 72680c6..184a6a7 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -47,6 +47,7 @@ class ClientTest : public ::testing::Test {
                               const std::string xml_data) {
     // Remove temp file always
     boost::filesystem::remove((dir + file).c_str());
+    boost::filesystem::create_directories(dir.c_str());
     WriteDataToFile((dir + file), xml_data);
   }
 
@@ -59,6 +60,7 @@ class ClientTest : public ::testing::Test {
   static std::unique_ptr<hbase::TestUtil> test_util;
 
   static void SetUpTestCase() {
+    google::InstallFailureSignalHandler();
     test_util = std::make_unique<hbase::TestUtil>();
     test_util->StartMiniCluster(2);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index 3f889d4..0053e19 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -22,6 +22,7 @@
 #include <glog/logging.h>
 #include <chrono>
 #include <exception>
+#include <memory>
 #include <utility>
 
 namespace hbase {
@@ -34,53 +35,19 @@ Client::Client() {
                   "hbase-site.xml is absent in the search path or problems in XML parsing";
     throw std::runtime_error("Configuration object not present.");
   }
-  init(conf.value());
+  Init(conf.value());
 }
 
-Client::Client(const hbase::Configuration &conf) { init(conf); }
+Client::Client(const Configuration &conf) { Init(conf); }
 
-void Client::init(const hbase::Configuration &conf) {
-  conf_ = std::make_shared<hbase::Configuration>(conf);
-
-  conn_conf_ = std::make_shared<hbase::ConnectionConfiguration>(*conf_);
-  // start thread pools
-  auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN));
-  auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN));
-  cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(cpu_threads);
-  io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(io_threads);
-
-  std::shared_ptr<Codec> codec = nullptr;
-  if (conf.Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
-      std::string(KeyValueCodec::kJavaClassName)) {
-    codec = std::make_shared<hbase::KeyValueCodec>();
-  } else {
-    LOG(WARNING) << "Not using RPC Cell Codec";
-  }
-  rpc_client_ =
-      std::make_shared<hbase::RpcClient>(io_executor_, codec, conn_conf_->connect_timeout());
-  location_cache_ =
-      std::make_shared<hbase::LocationCache>(conf_, cpu_executor_, rpc_client_->connection_pool());
-}
-
-// We can't have the threads continue running after everything is done
-// that leads to an error.
-Client::~Client() {
-  cpu_executor_->stop();
-  io_executor_->stop();
-  if (rpc_client_.get()) rpc_client_->Close();
-}
-
-std::unique_ptr<hbase::Table> Client::Table(const TableName &table_name) {
-  return std::make_unique<hbase::Table>(table_name, location_cache_, rpc_client_, conf_);
+void Client::Init(const Configuration &conf) {
+  auto conf_ = std::make_shared<Configuration>(conf);
+  async_connection_ = AsyncConnectionImpl::Create(conf_);
 }
 
-void Client::Close() {
-  if (is_closed_) return;
-
-  cpu_executor_->stop();
-  io_executor_->stop();
-  if (rpc_client_.get()) rpc_client_->Close();
-  is_closed_ = true;
+std::unique_ptr<Table> Client::Table(const TableName &table_name) {
+  return std::make_unique<hbase::Table>(table_name, async_connection_);
 }
 
+void Client::Close() { async_connection_->Close(); }
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index 040bea0..0e11278 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -19,22 +19,14 @@
 
 #pragma once
 
-#include <folly/futures/Future.h>
-#include <folly/io/IOBuf.h>
-#include <wangle/concurrent/CPUThreadPoolExecutor.h>
-#include <wangle/concurrent/IOThreadPoolExecutor.h>
-
 #include <memory>
 #include <string>
 
 #include "connection/rpc-client.h"
+#include "core/async-connection.h"
 #include "core/configuration.h"
-#include "core/connection-configuration.h"
-#include "core/hbase-configuration-loader.h"
-#include "core/keyvalue-codec.h"
-#include "core/location-cache.h"
+
 #include "core/table.h"
-#include "if/Cell.pb.h"
 #include "serde/table-name.h"
 
 using hbase::pb::TableName;
@@ -55,13 +47,14 @@ class Client {
    * @param quorum_spec Where to connect to get Zookeeper bootstrap information.
    */
   Client();
-  explicit Client(const hbase::Configuration& conf);
-  ~Client();
+  explicit Client(const Configuration& conf);
+  ~Client() = default;
+
   /**
    * @brief Retrieve a Table implementation for accessing a table.
    * @param - table_name
    */
-  std::unique_ptr<hbase::Table> Table(const TableName& table_name);
+  std::unique_ptr<::hbase::Table> Table(const TableName& table_name);
 
   /**
    * @brief Close the Client connection.
@@ -69,25 +62,12 @@ class Client {
   void Close();
 
  private:
-  /** Constants */
-  /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */
-  static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size";
-  /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */
-  static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size";
-  /** The RPC codec to encode cells. For now it is KeyValueCodec */
-  static constexpr const char* kRpcCodec = "hbase.client.rpc.codec";
-
   /** Data */
-  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
-  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
-  std::shared_ptr<hbase::LocationCache> location_cache_;
-  std::shared_ptr<hbase::RpcClient> rpc_client_;
-  std::shared_ptr<hbase::Configuration> conf_;
-  std::shared_ptr<hbase::ConnectionConfiguration> conn_conf_;
-  bool is_closed_ = false;
+  std::shared_ptr<AsyncConnectionImpl> async_connection_;
 
+ private:
   /** Methods */
-  void init(const hbase::Configuration& conf);
+  void Init(const Configuration& conf);
 };
 
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/connection-configuration.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/connection-configuration.h b/hbase-native-client/core/connection-configuration.h
index e1e9f87..c6d1d60 100644
--- a/hbase-native-client/core/connection-configuration.h
+++ b/hbase-native-client/core/connection-configuration.h
@@ -56,6 +56,20 @@ class ConnectionConfiguration {
         conf.GetLong(kClientScannerMaxResultsSize, kDefaultClientScannerMaxResultsSize);
   }
 
+  // Used by tests
+  ConnectionConfiguration(nanoseconds connect_timeout, nanoseconds operation_timeout,
+                          nanoseconds rpc_timeout, nanoseconds pause, uint32_t max_retries,
+                          uint32_t start_log_errors_count)
+      : connect_timeout_(connect_timeout),
+        operation_timeout_(operation_timeout),
+        meta_operation_timeout_(operation_timeout),
+        rpc_timeout_(rpc_timeout),
+        read_rpc_timeout_(rpc_timeout),
+        write_rpc_timeout_(rpc_timeout),
+        pause_(pause),
+        max_retries_(max_retries),
+        start_log_errors_count_(start_log_errors_count) {}
+
   nanoseconds connect_timeout() const { return connect_timeout_; }
 
   nanoseconds meta_operation_timeout() const { return meta_operation_timeout_; }
@@ -74,7 +88,7 @@ class ConnectionConfiguration {
   // timeout for each write rpc request
   nanoseconds write_rpc_timeout() const { return write_rpc_timeout_; }
 
-  nanoseconds pause_nanos() const { return pause_; }
+  nanoseconds pause() const { return pause_; }
 
   uint32_t max_retries() const { return max_retries_; }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/location-cache-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc
index 4159af4..8d1ac5f 100644
--- a/hbase-native-client/core/location-cache-test.cc
+++ b/hbase-native-client/core/location-cache-test.cc
@@ -33,6 +33,7 @@ using namespace std::chrono;
 class LocationCacheTest : public ::testing::Test {
  protected:
   static void SetUpTestCase() {
+    google::InstallFailureSignalHandler();
     test_util_ = std::make_unique<TestUtil>();
     test_util_->StartMiniCluster(2);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/location-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index 505f48c..07c3d61 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -158,7 +158,10 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(const Tabl
 }
 
 Future<shared_ptr<RegionLocation>> LocationCache::LocateRegion(const hbase::pb::TableName &tn,
-                                                               const std::string &row) {
+                                                               const std::string &row,
+                                                               const RegionLocateType locate_type,
+                                                               const int64_t locate_ns) {
+  // TODO: implement region locate type and timeout
   auto cached_loc = this->GetCachedLocation(tn, row);
   if (cached_loc != nullptr) {
     return cached_loc;
@@ -280,3 +283,8 @@ void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const st
   unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
   table_locs->erase(row);
 }
+
+void LocationCache::UpdateCachedLocation(const RegionLocation &loc, const std::exception &error) {
+  // TODO: just clears the location for now. We can inspect RegionMovedExceptions, etc later.
+  ClearCachedLocation(loc.region_info().table_name(), loc.region_info().start_key());
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/location-cache.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h
index c1c5f62..5e79213 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -32,6 +32,7 @@
 #include <string>
 
 #include "connection/connection-pool.h"
+#include "core/async-region-locator.h"
 #include "core/configuration.h"
 #include "core/meta-utils.h"
 #include "core/region-location.h"
@@ -75,7 +76,7 @@ typedef std::unordered_map<hbase::pb::TableName, std::shared_ptr<PerTableLocatio
 /**
  * Class that can look up and cache locations.
  */
-class LocationCache {
+class LocationCache : public AsyncRegionLocator {
  public:
   /**
    * Constructor.
@@ -127,8 +128,10 @@ class LocationCache {
    * @param row of the table to look up. This object must live until after the
    * future is returned
    */
-  folly::Future<std::shared_ptr<RegionLocation>> LocateRegion(const hbase::pb::TableName &tn,
-                                                              const std::string &row);
+  virtual folly::Future<std::shared_ptr<RegionLocation>> LocateRegion(
+      const hbase::pb::TableName &tn, const std::string &row,
+      const RegionLocateType locate_type = RegionLocateType::kCurrent,
+      const int64_t locate_ns = 0) override;
 
   /**
    * Remove the cached location of meta.
@@ -173,6 +176,12 @@ class LocationCache {
    */
   void ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row);
 
+  /**
+   * Update cached region location, possibly using the information from exception.
+   */
+  virtual void UpdateCachedLocation(const RegionLocation &loc,
+                                    const std::exception &error) override;
+
   const std::string &zk_quorum() { return zk_quorum_; }
 
  private:

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/meta-utils.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc
index 24a0360..119520f 100644
--- a/hbase-native-client/core/meta-utils.cc
+++ b/hbase-native-client/core/meta-utils.cc
@@ -89,7 +89,7 @@ std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn, const std::st
 }
 
 std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp) {
-  std::vector<std::unique_ptr<Result>> results = ResponseConverter::FromScanResponse(resp);
+  std::vector<std::shared_ptr<Result>> results = ResponseConverter::FromScanResponse(resp);
   if (results.size() != 1) {
     throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" +
                              std::to_string(results.size()));

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/raw-async-table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
new file mode 100644
index 0000000..641f3c8
--- /dev/null
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/raw-async-table.h"
+#include "core/request-converter.h"
+#include "core/response-converter.h"
+
+namespace hbase {
+
+template <typename RESP>
+std::shared_ptr<SingleRequestCallerBuilder<RESP>> RawAsyncTable::CreateCallerBuilder(
+    std::string row, nanoseconds rpc_timeout) {
+  return connection_->caller_factory()
+      ->Single<RESP>()
+      ->table(table_name_)
+      ->row(row)
+      ->rpc_timeout(rpc_timeout)
+      ->operation_timeout(connection_conf_->operation_timeout())
+      ->pause(connection_conf_->pause())
+      ->max_retries(connection_conf_->max_retries())
+      ->start_log_errors_count(connection_conf_->start_log_errors_count());
+}
+
+template <typename REQ, typename PREQ, typename PRESP, typename RESP>
+folly::Future<RESP> RawAsyncTable::Call(
+    std::shared_ptr<RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
+    std::shared_ptr<RegionLocation> loc, const REQ& req,
+    const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string>& req_converter,
+    const RespConverter<RESP, PRESP>& resp_converter) {
+  std::unique_ptr<PREQ> preq = req_converter(req, loc->region_name());
+
+  // No need to make take a callable argument, it is always the same
+  return rpc_client
+      ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq),
+                  User::defaultUser(), "ClientService")
+      .then([&](const std::unique_ptr<Response>& presp) {
+        return ResponseConverter::FromGetResponse(*presp);
+        // return resp_converter(*presp); // TODO this is causing SEGFAULT, figure out why
+      });
+}
+
+Future<std::shared_ptr<Result>> RawAsyncTable::Get(const hbase::Get& get) {
+  auto caller =
+      CreateCallerBuilder<std::shared_ptr<Result>>(get.Row(), connection_conf_->read_rpc_timeout())
+          ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller,
+                             std::shared_ptr<hbase::RegionLocation> loc,
+                             std::shared_ptr<hbase::RpcClient> rpc_client)
+                       -> folly::Future<std::shared_ptr<hbase::Result>> {
+                         return Call<hbase::Get, hbase::Request, hbase::Response,
+                                     std::shared_ptr<hbase::Result>>(
+                             rpc_client, controller, loc, get,
+                             &hbase::RequestConverter::ToGetRequest,
+                             &hbase::ResponseConverter::FromGetResponse);
+                       })
+          ->Build();
+
+  // Return the Future we obtain from the call(). However, we do not want the Caller to go out of
+  // context and get deallocated since the caller injects a lot of closures which capture [this, &]
+  // which is use-after-free. We are just passing an identity closure capturing caller by value to
+  // ensure  that the lifecycle of the Caller object is longer than the retry lambdas.
+  return caller->Call().then([caller](const auto r) { return r; });
+}
+
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/raw-async-table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h
new file mode 100644
index 0000000..527c7be
--- /dev/null
+++ b/hbase-native-client/core/raw-async-table.h
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <chrono>
+#include <memory>
+#include <string>
+
+#include <folly/futures/Future.h>
+
+#include "core/async-connection.h"
+#include "core/async-rpc-retrying-caller-factory.h"
+#include "core/async-rpc-retrying-caller.h"
+#include "core/connection-configuration.h"
+#include "core/get.h"
+#include "core/result.h"
+
+using folly::Future;
+using hbase::pb::TableName;
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+/**
+ * A low level asynchronous table that should not be used by user applications.The implementation
+ * is required to be thread safe.
+ */
+class RawAsyncTable {
+ public:
+  RawAsyncTable(std::shared_ptr<TableName> table_name, std::shared_ptr<AsyncConnection> connection)
+      : connection_(connection),
+        connection_conf_(connection->connection_conf()),
+        table_name_(table_name),
+        rpc_client_(connection->rpc_client()) {}
+  virtual ~RawAsyncTable() = default;
+
+  Future<std::shared_ptr<Result>> Get(const hbase::Get& get);
+  void Close() {}
+
+ private:
+  /* Data */
+  std::shared_ptr<AsyncConnection> connection_;
+  std::shared_ptr<ConnectionConfiguration> connection_conf_;
+  std::shared_ptr<TableName> table_name_;
+  std::shared_ptr<RpcClient> rpc_client_;
+
+  /* Methods */
+  template <typename REQ, typename PREQ, typename PRESP, typename RESP>
+  folly::Future<RESP> Call(
+      std::shared_ptr<RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
+      std::shared_ptr<RegionLocation> loc, const REQ& req,
+      const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string>& req_converter,
+      const RespConverter<RESP, PRESP>& resp_converter);
+
+  template <typename RESP>
+  std::shared_ptr<SingleRequestCallerBuilder<RESP>> CreateCallerBuilder(std::string row,
+                                                                        nanoseconds rpc_timeout);
+};
+
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/response-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index 7bb5e5d..b11856c 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -33,14 +33,17 @@ ResponseConverter::ResponseConverter() {}
 
 ResponseConverter::~ResponseConverter() {}
 
-std::unique_ptr<Result> ResponseConverter::FromGetResponse(const Response& resp) {
+// impl note: we are returning shared_ptr's instead of unique_ptr's because these
+// go inside folly::Future's, making the move semantics extremely tricky.
+std::shared_ptr<Result> ResponseConverter::FromGetResponse(const Response& resp) {
+  LOG(INFO) << "FromGetResponse";
   auto get_resp = std::static_pointer_cast<GetResponse>(resp.resp_msg());
-
   return ToResult(get_resp->result(), resp.cell_scanner());
 }
 
-std::unique_ptr<Result> ResponseConverter::ToResult(
+std::shared_ptr<Result> ResponseConverter::ToResult(
     const hbase::pb::Result& result, const std::unique_ptr<CellScanner>& cell_scanner) {
+  LOG(INFO) << "ToResult";
   std::vector<std::shared_ptr<Cell>> vcells;
   for (auto cell : result.cell()) {
     std::shared_ptr<Cell> pcell =
@@ -56,16 +59,17 @@ std::unique_ptr<Result> ResponseConverter::ToResult(
     }
     // TODO: check associated cell count?
   }
-  return std::make_unique<Result>(vcells, result.exists(), result.stale(), result.partial());
+  LOG(INFO) << "Returning Result";
+  return std::make_shared<Result>(vcells, result.exists(), result.stale(), result.partial());
 }
 
-std::vector<std::unique_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) {
+std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) {
   auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg());
-  VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString();
+  LOG(INFO) << "FromScanResponse:" << scan_resp->ShortDebugString();
   int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size()
                                                    : scan_resp->results_size();
 
-  std::vector<std::unique_ptr<Result>> results{static_cast<size_t>(num_results)};
+  std::vector<std::shared_ptr<Result>> results{static_cast<size_t>(num_results)};
   for (int i = 0; i < num_results; i++) {
     if (resp.cell_scanner() != nullptr) {
       // Cells are out in cellblocks.  Group them up again as Results.  How many to read at a
@@ -86,7 +90,7 @@ std::vector<std::unique_ptr<Result>> ResponseConverter::FromScanResponse(const R
         throw std::runtime_error(msg);
       }
       // TODO: handle partial results per Result by checking partial_flag_per_result
-      results[i] = std::make_unique<Result>(vcells, false, scan_resp->stale(), false);
+      results[i] = std::make_shared<Result>(vcells, false, scan_resp->stale(), false);
     } else {
       results[i] = ToResult(scan_resp->results(i), resp.cell_scanner());
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/response-converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h
index 759b1ce..743c14b 100644
--- a/hbase-native-client/core/response-converter.h
+++ b/hbase-native-client/core/response-converter.h
@@ -36,16 +36,16 @@ class ResponseConverter {
  public:
   ~ResponseConverter();
 
-  static std::unique_ptr<Result> ToResult(const hbase::pb::Result& result,
+  static std::shared_ptr<Result> ToResult(const hbase::pb::Result& result,
                                           const std::unique_ptr<CellScanner>& cell_scanner);
 
   /**
    * @brief Returns a Result object created by PB Message in passed Response object.
    * @param resp - Response object having the PB message.
    */
-  static std::unique_ptr<hbase::Result> FromGetResponse(const Response& resp);
+  static std::shared_ptr<hbase::Result> FromGetResponse(const Response& resp);
 
-  static std::vector<std::unique_ptr<Result>> FromScanResponse(const Response& resp);
+  static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response& resp);
 
  private:
   // Constructor not required. We have all static methods to extract response from PB messages.

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index 2ce8fcd..3c54d78 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -25,11 +25,13 @@
 #include <utility>
 #include <vector>
 
+#include "core/async-connection.h"
 #include "core/request-converter.h"
 #include "core/response-converter.h"
 #include "if/Client.pb.h"
 #include "security/user.h"
 #include "serde/server-name.h"
+#include "utils/time-util.h"
 
 using folly::Future;
 using hbase::pb::TableName;
@@ -38,41 +40,28 @@ using std::chrono::milliseconds;
 
 namespace hbase {
 
-Table::Table(const TableName &table_name,
-             const std::shared_ptr<hbase::LocationCache> &location_cache,
-             const std::shared_ptr<hbase::RpcClient> &rpc_client,
-             const std::shared_ptr<hbase::Configuration> &conf)
+Table::Table(const TableName &table_name, std::shared_ptr<AsyncConnection> async_connection)
     : table_name_(std::make_shared<TableName>(table_name)),
-      location_cache_(location_cache),
-      rpc_client_(rpc_client),
-      conf_(conf) {
-  client_retries_ = (conf_) ? conf_->GetInt("hbase.client.retries", client_retries_) : 5;
+      async_connection_(async_connection),
+      conf_(async_connection->conf()) {
+  async_table_ = std::make_unique<RawAsyncTable>(table_name_, async_connection);
 }
 
 Table::~Table() {}
 
-std::unique_ptr<hbase::Result> Table::Get(const hbase::Get &get) {
-  auto loc = location_cache_->LocateFromMeta(*table_name_, get.Row()).get(milliseconds(1000));
-  auto req = hbase::RequestConverter::ToGetRequest(get, loc->region_name());
-  auto user = User::defaultUser();  // TODO: make User::current() similar to UserUtil
-
-  Future<std::unique_ptr<Response>> f =
-      rpc_client_->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
-                             std::move(req), user, "ClientService");
-  auto resp = f.get();
-
-  return hbase::ResponseConverter::FromGetResponse(*resp);
+std::shared_ptr<hbase::Result> Table::Get(const hbase::Get &get) {
+  auto context = async_table_->Get(get);
+  return context.get(operation_timeout());
 }
 
-void Table::Close() {
-  if (is_closed_) return;
-
-  if (rpc_client_.get()) rpc_client_->Close();
-  is_closed_ = true;
+milliseconds Table::operation_timeout() const {
+  return TimeUtil::ToMillis(async_connection_->connection_conf()->operation_timeout());
 }
 
+void Table::Close() { async_table_->Close(); }
+
 std::shared_ptr<RegionLocation> Table::GetRegionLocation(const std::string &row) {
-  return location_cache_->LocateRegion(*table_name_, row).get();
+  return async_connection_->region_locator()->LocateRegion(*table_name_, row).get();
 }
 
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/core/table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index f82382e..93ab91b 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -19,15 +19,18 @@
 
 #pragma once
 
+#include <chrono>
 #include <memory>
 #include <string>
 #include <vector>
 
 #include "connection/rpc-client.h"
+#include "core/async-connection.h"
 #include "core/client.h"
 #include "core/configuration.h"
 #include "core/get.h"
 #include "core/location-cache.h"
+#include "core/raw-async-table.h"
 #include "core/result.h"
 #include "serde/table-name.h"
 
@@ -41,16 +44,17 @@ class Table {
   /**
    * Constructors
    */
-  Table(const TableName &table_name, const std::shared_ptr<hbase::LocationCache> &location_cache,
-        const std::shared_ptr<hbase::RpcClient> &rpc_client,
-        const std::shared_ptr<hbase::Configuration> &conf);
+  Table(const TableName &table_name, std::shared_ptr<AsyncConnection> async_connection);
   ~Table();
 
   /**
    * @brief - Returns a Result object for the constructed Get.
    * @param - get Get object to perform HBase Get operation.
    */
-  std::unique_ptr<hbase::Result> Get(const hbase::Get &get);
+  std::shared_ptr<hbase::Result> Get(const hbase::Get &get);
+
+  // TODO: next jira
+  // std::vector<std::unique_ptr<hbase::Result>> Get(const std::vector<hbase::Get> &gets);
 
   /**
    * @brief - Close the client connection.
@@ -64,11 +68,11 @@ class Table {
 
  private:
   std::shared_ptr<TableName> table_name_;
-  std::shared_ptr<hbase::LocationCache> location_cache_;
-  std::shared_ptr<hbase::RpcClient> rpc_client_;
+  std::shared_ptr<AsyncConnection> async_connection_;
   std::shared_ptr<hbase::Configuration> conf_;
-  bool is_closed_ = false;
-  // default 5 retries. over-ridden in constructor.
-  int client_retries_ = 5;
+  std::unique_ptr<RawAsyncTable> async_table_;
+
+ private:
+  milliseconds operation_timeout() const;
 };
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/security/user.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/security/user.h b/hbase-native-client/security/user.h
index 372ab44..035af31 100644
--- a/hbase-native-client/security/user.h
+++ b/hbase-native-client/security/user.h
@@ -37,6 +37,7 @@ class User {
   static bool IsSecurityEnabled(const Configuration& conf) {
     return conf.Get("hbase.security.authentication", "").compare(kKerberos) == 0;
   }
+
  private:
   std::string user_name_;
 };

http://git-wip-us.apache.org/repos/asf/hbase/blob/a772ba32/hbase-native-client/utils/time-util.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/utils/time-util.h b/hbase-native-client/utils/time-util.h
index bbc3b35..183260b 100644
--- a/hbase-native-client/utils/time-util.h
+++ b/hbase-native-client/utils/time-util.h
@@ -23,28 +23,45 @@
 #include <string>
 using std::chrono::nanoseconds;
 using std::chrono::milliseconds;
+using std::chrono::seconds;
 
 namespace hbase {
 class TimeUtil {
  public:
-  static int64_t ToMillis(const int64_t& nanos) {
+  static inline int64_t ToMillis(const int64_t& nanos) {
     return std::chrono::duration_cast<milliseconds>(nanoseconds(nanos)).count();
   }
 
-  static std::string ToMillisStr(const nanoseconds& nanos) {
+  static inline milliseconds ToMillis(const nanoseconds& nanos) {
+    return std::chrono::duration_cast<milliseconds>(nanoseconds(nanos));
+  }
+
+  static inline nanoseconds ToNanos(const milliseconds& millis) {
+    return std::chrono::duration_cast<nanoseconds>(millis);
+  }
+
+  static inline nanoseconds MillisToNanos(const int64_t& millis) {
+    return std::chrono::duration_cast<nanoseconds>(milliseconds(millis));
+  }
+
+  static inline nanoseconds SecondsToNanos(const int64_t& secs) {
+    return std::chrono::duration_cast<nanoseconds>(seconds(secs));
+  }
+
+  static inline std::string ToMillisStr(const nanoseconds& nanos) {
     return std::to_string(std::chrono::duration_cast<milliseconds>(nanos).count());
   }
 
-  static int64_t GetNowNanos() {
+  static inline int64_t GetNowNanos() {
     auto duration = std::chrono::high_resolution_clock::now().time_since_epoch();
     return std::chrono::duration_cast<nanoseconds>(duration).count();
   }
 
-  static int64_t ElapsedMillis(const int64_t& start_ns) {
+  static inline int64_t ElapsedMillis(const int64_t& start_ns) {
     return std::chrono::duration_cast<milliseconds>(nanoseconds(GetNowNanos() - start_ns)).count();
   }
 
-  static std::string ElapsedMillisStr(const int64_t& start_ns) {
+  static inline std::string ElapsedMillisStr(const int64_t& start_ns) {
     return std::to_string(
         std::chrono::duration_cast<milliseconds>(nanoseconds(GetNowNanos() - start_ns)).count());
   }


Mime
View raw message