hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject hbase git commit: HBASE-17465 [C++] implement request retry mechanism over RPC (Xiaobing Zhou)
Date Sat, 04 Mar 2017 03:17:27 GMT
Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 7d284f6b5 -> fc2accc56


HBASE-17465 [C++] implement request retry mechanism over RPC (Xiaobing Zhou)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fc2accc5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fc2accc5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fc2accc5

Branch: refs/heads/HBASE-14850
Commit: fc2accc56d0d9b04bc495662b5f0bfbd6e859449
Parents: 7d284f6
Author: Enis Soztutar <enis@apache.org>
Authored: Fri Mar 3 19:15:16 2017 -0800
Committer: Enis Soztutar <enis@apache.org>
Committed: Fri Mar 3 19:15:16 2017 -0800

----------------------------------------------------------------------
 hbase-native-client/Makefile                    |   2 +-
 hbase-native-client/bin/start-docker.sh         |   2 +-
 .../connection/connection-factory.cc            |   7 +-
 .../connection/connection-factory.h             |   3 +-
 .../connection/connection-pool.cc               |   4 +-
 hbase-native-client/connection/rpc-client.cc    |  42 +--
 hbase-native-client/connection/rpc-client.h     |  42 +--
 hbase-native-client/core/BUCK                   |  14 +
 .../core/async-rpc-retrying-caller-factory.cc   |  22 ++
 .../core/async-rpc-retrying-caller-factory.h    | 124 +++++++++
 .../core/async-rpc-retrying-caller.cc           |  22 ++
 .../core/async-rpc-retrying-caller.h            | 266 +++++++++++++++++++
 .../core/async-rpc-retrying-test.cc             | 255 ++++++++++++++++++
 hbase-native-client/core/client.cc              |   3 +-
 hbase-native-client/core/client.h               |   2 +-
 hbase-native-client/core/filter.h               |   2 +-
 .../core/hbase-rpc-controller.cc                |  22 ++
 hbase-native-client/core/hbase-rpc-controller.h |  56 ++++
 hbase-native-client/core/location-cache.cc      |   1 +
 hbase-native-client/core/region-location.h      |  10 +-
 hbase-native-client/core/response_converter.cc  |   1 +
 hbase-native-client/core/response_converter.h   |   1 +
 hbase-native-client/core/table.cc               |   4 +
 hbase-native-client/core/table.h                |   5 +
 hbase-native-client/exceptions/BUCK             |  24 ++
 hbase-native-client/exceptions/exception.h      | 104 ++++++++
 hbase-native-client/utils/BUCK                  |   7 +-
 hbase-native-client/utils/connection-util.cc    |  26 ++
 hbase-native-client/utils/connection-util.h     |  62 +++++
 hbase-native-client/utils/sys-util.h            |  39 +++
 hbase-native-client/utils/time-util.h           |  52 ++++
 31 files changed, 1131 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/Makefile
----------------------------------------------------------------------
diff --git a/hbase-native-client/Makefile b/hbase-native-client/Makefile
index 84ae556..b926220 100644
--- a/hbase-native-client/Makefile
+++ b/hbase-native-client/Makefile
@@ -22,7 +22,7 @@ LD:=g++
 DEBUG_PATH = build/debug
 RELEASE_PATH = build/release
 PROTO_SRC_DIR = build/if
-MODULES = connection core serde test-util utils security
+MODULES = connection core serde test-util utils security exceptions
 SRC_DIR = $(MODULES)
 DEBUG_BUILD_DIR = $(addprefix $(DEBUG_PATH)/,$(MODULES))
 RELEASE_BUILD_DIR = $(addprefix $(RELEASE_PATH)/,$(MODULES))

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/bin/start-docker.sh
----------------------------------------------------------------------
diff --git a/hbase-native-client/bin/start-docker.sh b/hbase-native-client/bin/start-docker.sh
index 1380cdf..8b017a0 100755
--- a/hbase-native-client/bin/start-docker.sh
+++ b/hbase-native-client/bin/start-docker.sh
@@ -56,7 +56,7 @@ docker build -t hbase_native .
 
 # After the image is built run the thing
 docker run -p 16050:16050/tcp \
-           -v ${BASE_DIR}/..:/usr/src/hbase \
+         -v ${BASE_DIR}/..:/usr/src/hbase \
            -v ~/.m2:/root/.m2 \
            -it hbase_native  /bin/bash
 popd

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/connection/connection-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index 2f7e75c..832b00f 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -31,11 +31,10 @@ using std::chrono::milliseconds;
 using std::chrono::nanoseconds;
 
 ConnectionFactory::ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool,
-                                     std::shared_ptr<Codec> codec,
-									 nanoseconds connect_timeout)
+                                     std::shared_ptr<Codec> codec, nanoseconds connect_timeout)
     : connect_timeout_(connect_timeout),
-	  io_pool_(io_pool),
-	  pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {}
+      io_pool_(io_pool),
+      pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {}
 
 std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> ConnectionFactory::MakeBootstrap() {
   auto client = std::make_shared<wangle::ClientBootstrap<SerializePipeline>>();

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/connection/connection-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index fbcb6ef..32d0bf7 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -44,8 +44,7 @@ class ConnectionFactory {
    * There should only be one ConnectionFactory per client.
    */
   ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool,
-                    std::shared_ptr<Codec> codec,
-					nanoseconds connect_timeout = nanoseconds(0));
+                    std::shared_ptr<Codec> codec, nanoseconds connect_timeout = nanoseconds(0));
 
   /** Default Destructor */
   virtual ~ConnectionFactory() = default;

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/connection/connection-pool.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index b18ee89..4fe4610 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -22,6 +22,7 @@
 #include <folly/SocketAddress.h>
 #include <wangle/service/Service.h>
 
+#include <folly/Logging.h>
 #include <memory>
 #include <utility>
 
@@ -34,8 +35,7 @@ using folly::SharedMutexWritePriority;
 using folly::SocketAddress;
 
 ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
-                               std::shared_ptr<Codec> codec,
-							   nanoseconds connect_timeout)
+                               std::shared_ptr<Codec> codec, nanoseconds connect_timeout)
     : cf_(std::make_shared<ConnectionFactory>(io_executor, codec, connect_timeout)),
       clients_(),
       connections_(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/connection/rpc-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index c61a73e..5fa1138 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -18,27 +18,17 @@
  */
 
 #include "connection/rpc-client.h"
+
+#include <folly/Logging.h>
 #include <unistd.h>
 #include <wangle/concurrent/IOThreadPoolExecutor.h>
+#include <memory>
+#include <string>
 
 using hbase::RpcClient;
-using hbase::AbstractRpcChannel;
 
 namespace hbase {
 
-class RpcChannelImplementation : public AbstractRpcChannel {
- public:
-  RpcChannelImplementation(std::shared_ptr<RpcClient> rpc_client, const std::string& host,
-                           uint16_t port, std::shared_ptr<User> ticket, int rpc_timeout)
-      : AbstractRpcChannel(rpc_client, host, port, ticket, rpc_timeout) {}
-
-  void CallMethod(const MethodDescriptor* method, RpcController* controller, const Message* request,
-                  Message* response, Closure* done) override {
-    rpc_client_->CallMethod(method, controller, request, response, done, host_, port_, ticket_);
-  }
-};
-}  // namespace hbase
-
 RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
                      std::shared_ptr<Codec> codec, nanoseconds connect_timeout)
     : io_executor_(io_executor) {
@@ -80,26 +70,4 @@ folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string&
 std::shared_ptr<RpcConnection> RpcClient::GetConnection(std::shared_ptr<ConnectionId> remote_id) {
   return cp_->GetConnection(remote_id);
 }
-
-std::shared_ptr<RpcChannel> RpcClient::CreateRpcChannel(const std::string& host, uint16_t port,
-                                                        std::shared_ptr<User> ticket,
-                                                        int rpc_timeout) {
-  std::shared_ptr<RpcChannelImplementation> channel = std::make_shared<RpcChannelImplementation>(
-      shared_from_this(), host, port, ticket, rpc_timeout);
-
-  /* static_pointer_cast is safe since RpcChannelImplementation derives
-   * from RpcChannel, otherwise, dynamic_pointer_cast should be used. */
-  return std::static_pointer_cast<RpcChannel>(channel);
-}
-
-void RpcClient::CallMethod(const MethodDescriptor* method, RpcController* controller,
-                           const Message* req_msg, Message* resp_msg, Closure* done,
-                           const std::string& host, uint16_t port, std::shared_ptr<User> ticket) {
-  std::shared_ptr<Message> shared_req(const_cast<Message*>(req_msg));
-  std::shared_ptr<Message> shared_resp(resp_msg);
-
-  std::unique_ptr<Request> req = std::make_unique<Request>(shared_req, shared_resp, method->name());
-
-  AsyncCall(host, port, std::move(req), ticket, method->service()->name())
-      .then([done, this](std::unique_ptr<Response> resp) { done->Run(); });
-}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/connection/rpc-client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index 5c11ab5..d416ceb 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -38,24 +38,15 @@ using hbase::ConnectionPool;
 using hbase::RpcConnection;
 using hbase::security::User;
 
-using google::protobuf::MethodDescriptor;
-using google::protobuf::RpcChannel;
 using google::protobuf::Message;
-using google::protobuf::RpcController;
-using google::protobuf::Closure;
-
 using std::chrono::nanoseconds;
 
-class RpcChannelImplementation;
-
 namespace hbase {
 
-class RpcClient : public std::enable_shared_from_this<RpcClient> {
-  friend class RpcChannelImplementation;
-
+class RpcClient {
  public:
-  RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
-            std::shared_ptr<Codec> codec, nanoseconds connect_timeout);
+  RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, std::shared_ptr<Codec> codec,
+            nanoseconds connect_timeout = nanoseconds(0));
 
   virtual ~RpcClient() { Close(); }
 
@@ -79,40 +70,13 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
 
   virtual void Close();
 
-  virtual std::shared_ptr<RpcChannel> CreateRpcChannel(const std::string &host, uint16_t port,
-                                                       std::shared_ptr<User> ticket,
-                                                       int rpc_timeout);
-
   std::shared_ptr<ConnectionPool> connection_pool() const { return cp_; }
 
  private:
-  void CallMethod(const MethodDescriptor *method, RpcController *controller, const Message *req_msg,
-                  Message *resp_msg, Closure *done, const std::string &host, uint16_t port,
-                  std::shared_ptr<User> ticket);
   std::shared_ptr<RpcConnection> GetConnection(std::shared_ptr<ConnectionId> remote_id);
 
  private:
   std::shared_ptr<ConnectionPool> cp_;
   std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
 };
-
-class AbstractRpcChannel : public RpcChannel {
- public:
-  AbstractRpcChannel(std::shared_ptr<RpcClient> rpc_client, const std::string &host, uint16_t port,
-                     std::shared_ptr<User> ticket, int rpc_timeout)
-      : rpc_client_(rpc_client),
-        host_(host),
-        port_(port),
-        ticket_(ticket),
-        rpc_timeout_(rpc_timeout) {}
-
-  virtual ~AbstractRpcChannel() = default;
-
- protected:
-  std::shared_ptr<RpcClient> rpc_client_;
-  std::string host_;
-  uint16_t port_;
-  std::shared_ptr<User> ticket_;
-  int rpc_timeout_;
-};
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index e541d8f..2f4f6c1 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -40,6 +40,9 @@ cxx_library(
         "request_converter.h",
         "response_converter.h",
         "table.h",
+        "async-rpc-retrying-caller-factory.h",
+        "async-rpc-retrying-caller.h",
+        "hbase-rpc-controller.h",
     ],
     srcs=[
         "cell.cc",
@@ -58,6 +61,8 @@ cxx_library(
         "table.cc",
     ],
     deps=[
+        "//exceptions:exceptions",
+        "//utils:utils",
         "//connection:connection",
         "//if:if",
         "//serde:serde",
@@ -96,6 +101,15 @@ cxx_test(
     deps=[":core",],
     run_test_separately=True,)
 cxx_test(
+    name="retry-test",
+    srcs=["async-rpc-retrying-test.cc",],
+    deps=[
+        ":core",
+        "//test-util:test-util",
+        "//exceptions:exceptions",
+    ],
+    run_test_separately=True,)
+cxx_test(
     name="time_range-test",
     srcs=["time_range-test.cc",],
     deps=[":core",],

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/async-rpc-retrying-caller-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc
new file mode 100644
index 0000000..0ac9cac
--- /dev/null
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-rpc-retrying-caller-factory.h"
+
+namespace hbase {}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/async-rpc-retrying-caller-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
new file mode 100644
index 0000000..3342e29
--- /dev/null
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Logging.h>
+#include <folly/io/IOBuf.h>
+#include <folly/io/async/EventBase.h>
+#include <chrono>
+#include <memory>
+#include <string>
+
+#include "connection/rpc-client.h"
+#include "core/async-rpc-retrying-caller.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+
+using hbase::pb::TableName;
+using std::chrono::nanoseconds;
+
+namespace hbase {
+
+template <typename CONN, typename RESP, typename RPC_CLIENT>
+class SingleRequestCallerBuilder
+    : public std::enable_shared_from_this<SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT>> {
+ public:
+  explicit SingleRequestCallerBuilder(std::shared_ptr<CONN> conn)
+      : conn_(conn),
+        table_name_(nullptr),
+        rpc_timeout_nanos_(0),
+        operation_timeout_nanos_(0),
+        locate_type_(RegionLocateType::kCurrent) {}
+
+  virtual ~SingleRequestCallerBuilder() = default;
+
+  typedef SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT> GenenericThisType;
+  typedef std::shared_ptr<GenenericThisType> SharedThisPtr;
+
+  SharedThisPtr table(std::shared_ptr<TableName> table_name) {
+    table_name_ = table_name;
+    return shared_this();
+  }
+
+  SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) {
+    rpc_timeout_nanos_ = rpc_timeout_nanos;
+    return shared_this();
+  }
+
+  SharedThisPtr operation_timeout(nanoseconds operation_timeout_nanos) {
+    operation_timeout_nanos_ = operation_timeout_nanos;
+    return shared_this();
+  }
+
+  SharedThisPtr row(const std::string& row) {
+    row_ = row;
+    return shared_this();
+  }
+
+  SharedThisPtr locate_type(RegionLocateType locate_type) {
+    locate_type_ = locate_type;
+    return shared_this();
+  }
+
+  SharedThisPtr action(Callable<RESP, RPC_CLIENT> callable) {
+    callable_ = callable;
+    return shared_this();
+  }
+
+  folly::Future<RESP> Call() { return Build()->Call(); }
+
+  std::shared_ptr<AsyncSingleRequestRpcRetryingCaller<CONN, RESP, RPC_CLIENT>> Build() {
+    return std::make_shared<AsyncSingleRequestRpcRetryingCaller<CONN, RESP, RPC_CLIENT>>(
+        conn_, table_name_, row_, locate_type_, callable_, conn_->get_conn_conf()->GetPauseNs(),
+        conn_->get_conn_conf()->GetMaxRetries(), operation_timeout_nanos_, rpc_timeout_nanos_,
+        conn_->get_conn_conf()->GetStartLogErrorsCount());
+  }
+
+ private:
+  SharedThisPtr shared_this() {
+    return std::enable_shared_from_this<GenenericThisType>::shared_from_this();
+  }
+
+ private:
+  std::shared_ptr<CONN> conn_;
+  std::shared_ptr<TableName> table_name_;
+  nanoseconds rpc_timeout_nanos_;
+  nanoseconds operation_timeout_nanos_;
+  std::string row_;
+  RegionLocateType locate_type_;
+  Callable<RESP, RPC_CLIENT> callable_;
+};  // end of SingleRequestCallerBuilder
+
+template <typename CONN>
+class AsyncRpcRetryingCallerFactory {
+ private:
+  std::shared_ptr<CONN> conn_;
+
+ public:
+  explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<CONN> conn) : conn_(conn) {}
+
+  virtual ~AsyncRpcRetryingCallerFactory() = default;
+
+  template <typename RESP, typename RPC_CLIENT = hbase::RpcClient>
+  std::shared_ptr<SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT>> Single() {
+    return std::make_shared<SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT>>(conn_);
+  }
+};
+
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/async-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
new file mode 100644
index 0000000..743b6bb
--- /dev/null
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-rpc-retrying-caller.h"
+
+namespace hbase {} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/async-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h
new file mode 100644
index 0000000..f7a1523
--- /dev/null
+++ b/hbase-native-client/core/async-rpc-retrying-caller.h
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Format.h>
+#include <folly/Logging.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/HHWheelTimer.h>
+#include <algorithm>
+#include <chrono>
+#include <functional>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+#include "connection/rpc-client.h"
+#include "core/hbase-rpc-controller.h"
+#include "core/region-location.h"
+#include "exceptions/exception.h"
+#include "if/HBase.pb.h"
+#include "utils/connection-util.h"
+#include "utils/sys-util.h"
+#include "utils/time-util.h"
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+template <typename T>
+using Supplier = std::function<T()>;
+
+template <typename T>
+using Consumer = std::function<void(T)>;
+
+template <typename R, typename S, typename... I>
+using ReqConverter = std::function<R(const S&, const I&...)>;
+
+template <typename R, typename S>
+using RespConverter = std::function<R(const S&)>;
+
+template <typename RESP>
+using RpcCallback = std::function<void(const RESP&)>;
+
+template <typename REQ, typename RESP, typename RPC_CLIENT = hbase::RpcClient>
+using RpcCall = std::function<folly::Future<std::unique_ptr<RESP>>(
+    std::shared_ptr<RPC_CLIENT>, std::shared_ptr<RegionLocation>,
+    std::shared_ptr<HBaseRpcController>, std::unique_ptr<REQ>)>;
+
+template <typename RESP, typename RPC_CLIENT = hbase::RpcClient>
+using Callable = std::function<folly::Future<RESP>(std::shared_ptr<HBaseRpcController>,
+                                                   std::shared_ptr<RegionLocation>,
+                                                   std::shared_ptr<RPC_CLIENT>)>;
+
+template <typename CONN, typename RESP, typename RPC_CLIENT = hbase::RpcClient>
+class AsyncSingleRequestRpcRetryingCaller {
+ public:
+  AsyncSingleRequestRpcRetryingCaller(std::shared_ptr<CONN> conn,
+                                      std::shared_ptr<hbase::pb::TableName> table_name,
+                                      const std::string& row, RegionLocateType locate_type,
+                                      Callable<RESP, RPC_CLIENT> callable, nanoseconds pause_ns,
+                                      uint32_t max_retries, nanoseconds operation_timeout_nanos,
+                                      nanoseconds rpc_timeout_nanos,
+                                      uint32_t start_log_errors_count)
+      : conn_(conn),
+        table_name_(table_name),
+        row_(row),
+        locate_type_(locate_type),
+        callable_(callable),
+        pause_ns_(pause_ns),
+        max_retries_(max_retries),
+        operation_timeout_nanos_(operation_timeout_nanos),
+        rpc_timeout_nanos_(rpc_timeout_nanos),
+        start_log_errors_count_(start_log_errors_count),
+        promise_(std::make_shared<folly::Promise<RESP>>()),
+        tries_(1) {
+    controller_ = conn_->get_rpc_controller_factory()->NewController();
+    start_ns_ = TimeUtil::GetNowNanos();
+    max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+    exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
+    retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_);
+  }
+
+  virtual ~AsyncSingleRequestRpcRetryingCaller() {}
+
+  folly::Future<RESP> Call() {
+    auto f = promise_->getFuture();
+    LocateThenCall();
+    return f;
+  }
+
+ private:
+  void LocateThenCall() {
+    int64_t locate_timeout_ns;
+    if (operation_timeout_nanos_.count() > 0) {
+      locate_timeout_ns = RemainingTimeNs();
+      if (locate_timeout_ns <= 0) {
+        CompleteExceptionally();
+        return;
+      }
+    } else {
+      locate_timeout_ns = -1L;
+    }
+
+    conn_->get_locator()
+        ->GetRegionLocation(table_name_, row_, locate_type_, locate_timeout_ns)
+        .then([this](RegionLocation& loc) { Call(loc); })
+        .onError([this](const std::exception& e) {
+          OnError(e,
+                  [this]() -> std::string {
+                    return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" +
+                           table_name_->qualifier() + " failed, tries = " + std::to_string(tries_) +
+                           ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " +
+                           TimeUtil::ToMillisStr(operation_timeout_nanos_) +
+                           " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) +
+                           " ms";
+                  },
+                  [](const std::exception& error) {});
+        });
+  }
+
+  void OnError(const std::exception& error, Supplier<std::string> err_msg,
+               Consumer<std::exception> update_cached_location) {
+    ThrowableWithExtraContext twec(std::make_shared<std::exception>(error),
+                                   TimeUtil::GetNowNanos());
+    exceptions_->push_back(twec);
+    if (SysUtil::InstanceOf<DoNotRetryIOException, std::exception>(error) ||
+        tries_ >= max_retries_) {
+      CompleteExceptionally();
+      return;
+    }
+
+    int64_t delay_ns;
+    if (operation_timeout_nanos_.count() > 0) {
+      int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
+      if (max_delay_ns <= 0) {
+        CompleteExceptionally();
+        return;
+      }
+      delay_ns =
+          std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1));
+    } else {
+      delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1);
+    }
+    update_cached_location(error);
+    tries_++;
+    retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); },
+                                    milliseconds(TimeUtil::ToMillis(delay_ns)));
+  }
+
+  void Call(const RegionLocation& loc) {
+    int64_t call_timeout_ns;
+    if (operation_timeout_nanos_.count() > 0) {
+      call_timeout_ns = this->RemainingTimeNs();
+      if (call_timeout_ns <= 0) {
+        this->CompleteExceptionally();
+        return;
+      }
+      call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count());
+    } else {
+      call_timeout_ns = rpc_timeout_nanos_.count();
+    }
+
+    std::shared_ptr<RPC_CLIENT> rpc_client;
+    try {
+      rpc_client = conn_->GetRpcClient();
+    } catch (const IOException& e) {
+      OnError(e,
+              [&, this]() -> std::string {
+                return "Get async rpc_client to " +
+                       folly::sformat("{0}:{1}", loc.server_name().host_name(),
+                                      loc.server_name().port()) +
+                       " for '" + row_ + "' in " + loc.DebugString() + " of " +
+                       table_name_->namespace_() + "::" + table_name_->qualifier() +
+                       " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " +
+                       std::to_string(max_attempts_) + ", timeout = " +
+                       TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
+                       " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
+              },
+              [&, this](const std::exception& error) {
+                conn_->get_locator()->UpdateCachedLocation(loc, error);
+              });
+      return;
+    }
+
+    ResetController(controller_, call_timeout_ns);
+
+    callable_(controller_, std::make_shared<RegionLocation>(loc), rpc_client)
+        .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
+        .onError([&, this](const std::exception& e) {
+          OnError(e,
+                  [&, this]() -> std::string {
+                    return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(),
+                                                       loc.server_name().port()) +
+                           " for '" + row_ + "' in " + loc.DebugString() + " of " +
+                           table_name_->namespace_() + "::" + table_name_->qualifier() +
+                           " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " +
+                           std::to_string(max_attempts_) + ", timeout = " +
+                           TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
+                           " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) +
+                           " ms";
+                  },
+                  [&, this](const std::exception& error) {
+                    conn_->get_locator()->UpdateCachedLocation(loc, error);
+                  });
+          return;
+        });
+  }
+
+  void CompleteExceptionally() {
+    this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
+  }
+
+  int64_t RemainingTimeNs() {
+    return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+  }
+
+  static void ResetController(std::shared_ptr<HBaseRpcController> controller,
+                              const int64_t& timeout_ns) {
+    controller->Reset();
+    if (timeout_ns >= 0) {
+      controller->set_call_timeout(
+          milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_ns))));
+    }
+  }
+
+ private:
+  folly::HHWheelTimer::UniquePtr retry_timer_;
+  std::shared_ptr<CONN> conn_;
+  std::shared_ptr<hbase::pb::TableName> table_name_;
+  std::string row_;
+  RegionLocateType locate_type_;
+  Callable<RESP, RPC_CLIENT> callable_;
+  nanoseconds pause_ns_;
+  uint32_t max_retries_;
+  nanoseconds operation_timeout_nanos_;
+  nanoseconds rpc_timeout_nanos_;
+  uint32_t start_log_errors_count_;
+  std::shared_ptr<folly::Promise<RESP>> promise_;
+  std::shared_ptr<HBaseRpcController> controller_;
+  uint64_t start_ns_;
+  uint32_t tries_;
+  std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_;
+  uint32_t max_attempts_;
+  folly::EventBase event_base_;
+};
+
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/async-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
new file mode 100644
index 0000000..a9b0017
--- /dev/null
+++ b/hbase-native-client/core/async-rpc-retrying-test.cc
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Logging.h>
+#include <folly/Memory.h>
+#include <folly/futures/Future.h>
+#include <gmock/gmock.h>
+#include <google/protobuf/stubs/callback.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
+
+#include <functional>
+#include <string>
+
+#include "connection/request.h"
+#include "connection/response.h"
+#include "connection/rpc-client.h"
+#include "core/async-rpc-retrying-caller-factory.h"
+#include "core/async-rpc-retrying-caller.h"
+#include "core/client.h"
+#include "core/hbase-rpc-controller.h"
+#include "core/keyvalue-codec.h"
+#include "core/region-location.h"
+#include "core/request_converter.h"
+#include "core/response_converter.h"
+#include "core/result.h"
+#include "exceptions/exception.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+#include "test-util/test-util.h"
+
+using namespace google::protobuf;
+using namespace hbase;
+using namespace hbase::pb;
+using namespace std::placeholders;
+using namespace testing;
+using ::testing::Return;
+using ::testing::_;
+using std::chrono::nanoseconds;
+
+class MockRpcControllerFactory {
+ public:
+  MOCK_METHOD0(NewController, std::shared_ptr<HBaseRpcController>());
+};
+
+class MockAsyncConnectionConfiguration {
+ public:
+  MOCK_METHOD0(GetPauseNs, nanoseconds());
+  MOCK_METHOD0(GetMaxRetries, int32_t());
+  MOCK_METHOD0(GetStartLogErrorsCount, int32_t());
+  MOCK_METHOD0(GetReadRpcTimeoutNs, nanoseconds());
+  MOCK_METHOD0(GetOperationTimeoutNs, nanoseconds());
+};
+
+class AsyncRegionLocator {
+ public:
+  explicit AsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : region_location_(region_location) {}
+  ~AsyncRegionLocator() = default;
+
+  folly::Future<RegionLocation> GetRegionLocation(std::shared_ptr<hbase::pb::TableName>,
+                                                  const std::string&, RegionLocateType, int64_t) {
+    folly::Promise<RegionLocation> promise;
+    promise.setValue(*region_location_);
+    return promise.getFuture();
+  }
+
+  void UpdateCachedLocation(const RegionLocation&, const std::exception&) {}
+
+ private:
+  std::shared_ptr<RegionLocation> region_location_;
+};
+
+class MockAsyncConnection {
+ public:
+  MOCK_METHOD0(get_conn_conf, std::shared_ptr<MockAsyncConnectionConfiguration>());
+  MOCK_METHOD0(get_rpc_controller_factory, std::shared_ptr<MockRpcControllerFactory>());
+  MOCK_METHOD0(get_locator, std::shared_ptr<AsyncRegionLocator>());
+  MOCK_METHOD0(GetRpcClient, std::shared_ptr<hbase::RpcClient>());
+};
+
+template <typename CONN>
+class MockRawAsyncTableImpl {
+ public:
+  explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn)
+      : conn_(conn), promise_(std::make_shared<folly::Promise<hbase::Result>>()) {}
+  virtual ~MockRawAsyncTableImpl() = default;
+
+  /* implement this in real RawAsyncTableImpl. */
+
+  /* in real RawAsyncTableImpl, this should be private. */
+  folly::Future<hbase::Result> GetCall(std::shared_ptr<hbase::RpcClient> rpc_client,
+                                       std::shared_ptr<HBaseRpcController> controller,
+                                       std::shared_ptr<RegionLocation> loc, const hbase::Get& get) {
+    hbase::RpcCall<hbase::Request, hbase::Response, hbase::RpcClient> rpc_call = [](
+        std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc,
+        std::shared_ptr<HBaseRpcController> controller,
+        std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> {
+      return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
+                                   std::move(preq), User::defaultUser(), "ClientService");
+    };
+
+    return Call<hbase::Get, hbase::Request, hbase::Response, hbase::Result>(
+        rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call,
+        &hbase::ResponseConverter::FromGetResponse);
+  }
+
+  /* in real RawAsyncTableImpl, this should be private. */
+  template <typename REQ, typename PREQ, typename PRESP, typename RESP>
+  folly::Future<RESP> Call(
+      std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
+      std::shared_ptr<RegionLocation> loc, const REQ& req,
+      const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string>& req_converter,
+      const hbase::RpcCall<PREQ, PRESP, hbase::RpcClient>& rpc_call,
+      const RespConverter<std::unique_ptr<RESP>, PRESP>& resp_converter) {
+    rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name())))
+        .then([&, this](std::unique_ptr<PRESP> presp) {
+          std::unique_ptr<hbase::Result> result = hbase::ResponseConverter::FromGetResponse(*presp);
+          promise_->setValue(std::move(*result));
+        })
+        .onError([this](const std::exception& e) { promise_->setException(e); });
+    return promise_->getFuture();
+  }
+
+ private:
+  std::shared_ptr<CONN> conn_;
+  std::shared_ptr<folly::Promise<hbase::Result>> promise_;
+};
+
+TEST(AsyncRpcRetryTest, TestGetBasic) {
+  // Remove already configured env if present.
+  unsetenv("HBASE_CONF");
+
+  // Using TestUtil to populate test data
+  hbase::TestUtil* test_util = new hbase::TestUtil();
+  test_util->RunShellCmd("create 't', 'd'");
+  test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'");
+  test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'");
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>("t");
+  auto row = "test2";
+
+  // Get to be performed on above HBase Table
+  hbase::Get get(row);
+
+  // Create Configuration
+  hbase::Configuration conf;
+
+  // Create a client
+  Client client(conf);
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  /* init region location and rpc channel */
+  auto region_location = table->GetRegionLocation(row);
+
+  auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+  auto codec = std::make_shared<hbase::KeyValueCodec>();
+  auto rpc_client = std::make_shared<RpcClient>(io_executor_, codec);
+
+  /* init rpc controller */
+  auto controller = std::make_shared<HBaseRpcController>();
+
+  /* init rpc controller factory */
+  auto controller_factory = std::make_shared<MockRpcControllerFactory>();
+  EXPECT_CALL((*controller_factory), NewController()).Times(1).WillRepeatedly(Return(controller));
+
+  /* init connection configuration */
+  auto connection_conf = std::make_shared<MockAsyncConnectionConfiguration>();
+  EXPECT_CALL((*connection_conf), GetPauseNs())
+      .Times(1)
+      .WillRepeatedly(Return(nanoseconds(100000000)));
+  EXPECT_CALL((*connection_conf), GetMaxRetries()).Times(1).WillRepeatedly(Return(31));
+  EXPECT_CALL((*connection_conf), GetStartLogErrorsCount()).Times(1).WillRepeatedly(Return(9));
+  EXPECT_CALL((*connection_conf), GetReadRpcTimeoutNs())
+      .Times(1)
+      .WillRepeatedly(Return(nanoseconds(60000000000)));
+  EXPECT_CALL((*connection_conf), GetOperationTimeoutNs())
+      .Times(1)
+      .WillRepeatedly(Return(nanoseconds(1200000000000)));
+
+  /* init region locator */
+  auto region_locator = std::make_shared<AsyncRegionLocator>(region_location);
+
+  /* init hbase client connection */
+  auto conn = std::make_shared<MockAsyncConnection>();
+  EXPECT_CALL((*conn), get_conn_conf()).Times(AtLeast(1)).WillRepeatedly(Return(connection_conf));
+  EXPECT_CALL((*conn), get_rpc_controller_factory())
+      .Times(AtLeast(1))
+      .WillRepeatedly(Return(controller_factory));
+  EXPECT_CALL((*conn), get_locator()).Times(AtLeast(1)).WillRepeatedly(Return(region_locator));
+  EXPECT_CALL((*conn), GetRpcClient()).Times(AtLeast(1)).WillRepeatedly(Return(rpc_client));
+
+  /* init retry caller factory */
+  auto tableImpl = std::make_shared<MockRawAsyncTableImpl<MockAsyncConnection>>(conn);
+  AsyncRpcRetryingCallerFactory<MockAsyncConnection> caller_factory(conn);
+
+  /* init request caller builder */
+  auto builder = caller_factory.Single<hbase::Result>();
+
+  /* call with retry to get result */
+  try {
+    auto async_caller =
+        builder->table(std::make_shared<TableName>(tn))
+            ->row(row)
+            ->rpc_timeout(conn->get_conn_conf()->GetReadRpcTimeoutNs())
+            ->operation_timeout(conn->get_conn_conf()->GetOperationTimeoutNs())
+            ->action(
+                [=, &get](
+                    std::shared_ptr<hbase::HBaseRpcController> controller,
+                    std::shared_ptr<hbase::RegionLocation> loc,
+                    std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<hbase::Result> {
+                  return tableImpl->GetCall(rpc_client, controller, loc, get);
+                })
+            ->Build();
+
+    hbase::Result result = async_caller->Call().get();
+
+    /*Stopping the connection as we are getting segfault due to some folly issue
+     The connection stays open and we don't want that.
+     So we are stopping the connection.
+     We can remove this once we have fixed the folly part */
+    delete test_util;
+
+    // Test the values, should be same as in put executed on hbase shell
+    ASSERT_TRUE(!result.IsEmpty()) << "Result shouldn't be empty.";
+    EXPECT_EQ("test2", result.Row());
+    EXPECT_EQ("value2", *(result.Value("d", "2")));
+    EXPECT_EQ("value for extra", *(result.Value("d", "extra")));
+  } catch (std::exception& e) {
+    LOG(ERROR) << e.what();
+    throw e;
+  }
+
+  table->Close();
+  client.Close();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index 240da72..f0483ef 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -57,7 +57,8 @@ void Client::init(const hbase::Configuration &conf) {
   } else {
     LOG(WARNING) << "Not using RPC Cell Codec";
   }
-  rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, codec, conn_conf_->connect_timeout());
+  rpc_client_ =
+      std::make_shared<hbase::RpcClient>(io_executor_, codec, conn_conf_->connect_timeout());
   location_cache_ =
       std::make_shared<hbase::LocationCache>(conf_, cpu_executor_, rpc_client_->connection_pool());
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index a96d6f3..e73ab70 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -89,7 +89,7 @@ class Client {
   bool is_closed_ = false;
 
   /** Methods */
-  void init(const hbase::Configuration &conf);
+  void init(const hbase::Configuration& conf);
 };
 
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/filter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/filter.h b/hbase-native-client/core/filter.h
index b5b7133..10accaa 100644
--- a/hbase-native-client/core/filter.h
+++ b/hbase-native-client/core/filter.h
@@ -20,9 +20,9 @@
 #pragma once
 
 #include <memory>
+#include <set>
 #include <string>
 #include <utility>
-#include <set>
 #include <vector>
 
 #include "if/Comparator.pb.h"

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/hbase-rpc-controller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/hbase-rpc-controller.cc b/hbase-native-client/core/hbase-rpc-controller.cc
new file mode 100644
index 0000000..bc53781
--- /dev/null
+++ b/hbase-native-client/core/hbase-rpc-controller.cc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/hbase-rpc-controller.h"
+
+namespace hbase {} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/hbase-rpc-controller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/hbase-rpc-controller.h b/hbase-native-client/core/hbase-rpc-controller.h
new file mode 100644
index 0000000..661c810
--- /dev/null
+++ b/hbase-native-client/core/hbase-rpc-controller.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <google/protobuf/service.h>
+#include <chrono>
+#include <string>
+
+using google::protobuf::RpcController;
+using google::protobuf::Closure;
+
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+class HBaseRpcController : public RpcController {
+ public:
+  HBaseRpcController() {}
+  virtual ~HBaseRpcController() = default;
+
+  void set_call_timeout(const milliseconds& call_timeout) {
+    // TODO:
+  }
+
+  void Reset() override {}
+
+  bool Failed() const override { return false; }
+
+  std::string ErrorText() const override { return ""; }
+
+  void StartCancel() override {}
+
+  void SetFailed(const std::string& reason) override {}
+
+  bool IsCanceled() const override { return false; }
+
+  void NotifyOnCancel(Closure* callback) override {}
+};
+
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/location-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index da9f64a..17032fe 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -25,6 +25,7 @@
 
 #include <utility>
 
+#include <folly/Logging.h>
 #include "connection/response.h"
 #include "connection/rpc-connection.h"
 #include "if/Client.pb.h"

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/region-location.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index b0411cb..4087d94 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -26,6 +26,8 @@
 
 namespace hbase {
 
+enum RegionLocateType { kBefore, kCurrent, kAfter };
+
 /**
  * @brief class to hold where a region is located.
  *
@@ -49,17 +51,17 @@ class RegionLocation {
   /**
    * Get a reference to the regio info
    */
-  const hbase::pb::RegionInfo &region_info() { return ri_; }
+  const hbase::pb::RegionInfo &region_info() const { return ri_; }
 
   /**
    * Get a reference to the server name
    */
-  const hbase::pb::ServerName &server_name() { return sn_; }
+  const hbase::pb::ServerName &server_name() const { return sn_; }
 
   /**
    * Get a reference to the region name.
    */
-  const std::string &region_name() { return region_name_; }
+  const std::string &region_name() const { return region_name_; }
 
   /**
    * Get a service. This could be closed or null. It's the caller's
@@ -79,7 +81,7 @@ class RegionLocation {
    */
   void set_server_name(hbase::pb::ServerName sn) { sn_ = sn; }
 
-  const std::string DebugString() {
+  const std::string DebugString() const {
     return "region_info:" + ri_.ShortDebugString() + ", server_name:" + sn_.ShortDebugString();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/response_converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response_converter.cc b/hbase-native-client/core/response_converter.cc
index 19a3554..2497306 100644
--- a/hbase-native-client/core/response_converter.cc
+++ b/hbase-native-client/core/response_converter.cc
@@ -19,6 +19,7 @@
 
 #include "core/response_converter.h"
 
+#include <string>
 #include <vector>
 
 #include "core/cell.h"

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/response_converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response_converter.h b/hbase-native-client/core/response_converter.h
index 859644b..759b1ce 100644
--- a/hbase-native-client/core/response_converter.h
+++ b/hbase-native-client/core/response_converter.h
@@ -20,6 +20,7 @@
 #pragma once
 
 #include <memory>
+#include <vector>
 #include "connection/response.h"
 #include "core/result.h"
 #include "if/Client.pb.h"

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index 4e30d4b..ba4dc29 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -71,4 +71,8 @@ void Table::Close() {
   is_closed_ = true;
 }
 
+std::shared_ptr<RegionLocation> Table::GetRegionLocation(const std::string &row) {
+  return location_cache_->LocateRegion(*table_name_, row).get();
+}
+
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/core/table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index 0e98cd2..f82382e 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -57,6 +57,11 @@ class Table {
    */
   void Close();
 
+  /**
+   * @brief - Get region location for a row in current table.
+   */
+  std::shared_ptr<RegionLocation> GetRegionLocation(const std::string &row);
+
  private:
   std::shared_ptr<TableName> table_name_;
   std::shared_ptr<hbase::LocationCache> location_cache_;

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/exceptions/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/exceptions/BUCK b/hbase-native-client/exceptions/BUCK
new file mode 100644
index 0000000..a23654c
--- /dev/null
+++ b/hbase-native-client/exceptions/BUCK
@@ -0,0 +1,24 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cxx_library(
+    name="exceptions",
+    exported_headers=["exception.h",],
+    srcs=[],
+    deps=["//third-party:folly",],
+    compiler_flags=['-Weffc++'],
+    visibility=['//core/...'],)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/exceptions/exception.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h
new file mode 100644
index 0000000..c0c4142
--- /dev/null
+++ b/hbase-native-client/exceptions/exception.h
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <exception>
+#include <string>
+#include <vector>
+#include <folly/io/IOBuf.h>
+
+namespace hbase {
+
+class ThrowableWithExtraContext {
+public:
+  ThrowableWithExtraContext(std::shared_ptr<std::exception> cause,
+      const long& when) :
+      cause_(cause), when_(when), extras_("") {
+  }
+
+  ThrowableWithExtraContext(std::shared_ptr<std::exception> cause,
+      const long& when, const std::string& extras) :
+      cause_(cause), when_(when), extras_(extras) {
+  }
+
+  std::string ToString() {
+    // TODO:
+    // return new Date(this.when).toString() + ", " + extras + ", " + t.toString();
+    return extras_ + ", " + cause_->what();
+  }
+
+  std::shared_ptr<std::exception> cause() {
+    return cause_;
+  }
+private:
+  std::shared_ptr<std::exception> cause_;
+  long when_;
+  std::string extras_;
+};
+
+class IOException: public std::logic_error {
+public:
+  IOException(
+        const std::string& what) :
+        logic_error(what), cause_(nullptr) {}
+  IOException(
+      const std::string& what,
+      std::shared_ptr<std::exception> cause) :
+      logic_error(what), cause_(cause) {}
+  virtual ~IOException() = default;
+
+  std::shared_ptr<std::exception> cause() {
+    return cause_;
+  }
+private:
+  const std::shared_ptr<std::exception> cause_;
+};
+
+class RetriesExhaustedException: public IOException {
+public:
+  RetriesExhaustedException(
+      const int& num_retries,
+      std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) :
+        IOException(
+            GetMessage(num_retries, exceptions),
+            exceptions->empty() ? nullptr : (*exceptions)[exceptions->size() - 1].cause()){
+  }
+  virtual ~RetriesExhaustedException() = default;
+
+private:
+  std::string GetMessage(
+      const int& num_retries,
+      std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) {
+    std::string buffer("Failed after attempts=");
+    buffer.append(std::to_string(num_retries + 1));
+    buffer.append(", exceptions:\n");
+    for (auto it = exceptions->begin();  it != exceptions->end(); it++) {
+      buffer.append(it->ToString());
+      buffer.append("\n");
+    }
+    return buffer;
+  }
+};
+
+class HBaseIOException : public IOException {
+};
+
+class DoNotRetryIOException : public HBaseIOException {
+};
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/utils/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/utils/BUCK b/hbase-native-client/utils/BUCK
index 796f2f5..eae929e 100644
--- a/hbase-native-client/utils/BUCK
+++ b/hbase-native-client/utils/BUCK
@@ -17,8 +17,11 @@
 
 cxx_library(
     name="utils",
-    exported_headers=["user-util.h", "version.h"],
-    srcs=["user-util.cc",],
+    exported_headers=[
+        "user-util.h", "version.h", "connection-util.h", "sys-util.h",
+        "time-util.h"
+    ],
+    srcs=["user-util.cc", "connection-util.cc"],
     deps=['//third-party:folly',],
     tests=[":user-util-test"],
     visibility=['PUBLIC',],

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/utils/connection-util.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/utils/connection-util.cc b/hbase-native-client/utils/connection-util.cc
new file mode 100644
index 0000000..76689bf
--- /dev/null
+++ b/hbase-native-client/utils/connection-util.cc
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "utils/connection-util.h"
+
+namespace hbase {
+
+const std::vector<uint32_t> ConnectionUtils::kRetryBackoff = {1,   2,   3,   5,   10,  20, 40,
+                                                              100, 100, 100, 100, 200, 200};
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/utils/connection-util.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/utils/connection-util.h b/hbase-native-client/utils/connection-util.h
new file mode 100644
index 0000000..f52c2f9
--- /dev/null
+++ b/hbase-native-client/utils/connection-util.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <climits>
+#include <cstdlib>
+#include <memory>
+#include <vector>
+#include "utils/time-util.h"
+
+namespace hbase {
+class ConnectionUtils {
+ public:
+  static int Retries2Attempts(const int& retries) {
+    return std::max(1, retries == INT_MAX ? INT_MAX : retries + 1);
+  }
+
+  /* Add a delta to avoid timeout immediately after a retry sleeping. */
+  static const uint64_t kSleepDeltaNs = 1000000;
+
+  static const std::vector<uint32_t> kRetryBackoff;
+  /**
+   * Calculate pause time. Built on {@link kRetryBackoff}.
+   * @param pause time to pause
+   * @param tries amount of tries
+   * @return How long to wait after <code>tries</code> retries
+   */
+  static int64_t GetPauseTime(const int64_t& pause, const int32_t& tries) {
+    int32_t ntries = tries;
+    if (static_cast<size_t>(ntries) >= kRetryBackoff.size()) {
+      ntries = kRetryBackoff.size() - 1;
+    }
+    if (ntries < 0) {
+      ntries = 0;
+    }
+
+    int64_t normal_pause = pause * kRetryBackoff[ntries];
+    // 1% possible jitter
+    float r = static_cast<float>(std::rand()) / static_cast<float>(RAND_MAX);
+    int64_t jitter = (int64_t)(normal_pause * r * 0.01f);
+    return normal_pause + jitter;
+  }
+};
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/utils/sys-util.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/utils/sys-util.h b/hbase-native-client/utils/sys-util.h
new file mode 100644
index 0000000..68f00d7
--- /dev/null
+++ b/hbase-native-client/utils/sys-util.h
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <type_traits>
+
+namespace hbase {
+
+class SysUtil {
+ public:
+  template <class BASE, typename DERIVED>
+  static constexpr bool InstanceOf(const DERIVED& object) {
+    return !dynamic_cast<const BASE*>(&object);
+  }
+
+  template <typename BASE, typename DERIVED>
+  static constexpr bool InstanceOf() {
+    return std::is_base_of<BASE, DERIVED>();
+  }
+};
+
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc2accc5/hbase-native-client/utils/time-util.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/utils/time-util.h b/hbase-native-client/utils/time-util.h
new file mode 100644
index 0000000..bbc3b35
--- /dev/null
+++ b/hbase-native-client/utils/time-util.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <chrono>
+#include <string>
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace hbase {
+class TimeUtil {
+ public:
+  static int64_t ToMillis(const int64_t& nanos) {
+    return std::chrono::duration_cast<milliseconds>(nanoseconds(nanos)).count();
+  }
+
+  static std::string ToMillisStr(const nanoseconds& nanos) {
+    return std::to_string(std::chrono::duration_cast<milliseconds>(nanos).count());
+  }
+
+  static int64_t GetNowNanos() {
+    auto duration = std::chrono::high_resolution_clock::now().time_since_epoch();
+    return std::chrono::duration_cast<nanoseconds>(duration).count();
+  }
+
+  static int64_t ElapsedMillis(const int64_t& start_ns) {
+    return std::chrono::duration_cast<milliseconds>(nanoseconds(GetNowNanos() - start_ns)).count();
+  }
+
+  static std::string ElapsedMillisStr(const int64_t& start_ns) {
+    return std::to_string(
+        std::chrono::duration_cast<milliseconds>(nanoseconds(GetNowNanos() - start_ns)).count());
+  }
+};
+} /* namespace hbase */


Mime
View raw message