hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject hbase git commit: HBASE-17576 [C++] Implement request retry mechanism over RPC for Multi calls. (Sudeep Sunthankar)
Date Tue, 16 May 2017 18:43:18 GMT
Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 018f1eab2 -> ccfc68251


HBASE-17576 [C++] Implement request retry mechanism over RPC for Multi calls. (Sudeep Sunthankar)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ccfc6825
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ccfc6825
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ccfc6825

Branch: refs/heads/HBASE-14850
Commit: ccfc68251658f4b30081553be62797d18540a8b4
Parents: 018f1ea
Author: Enis Soztutar <enis@apache.org>
Authored: Tue May 16 11:43:04 2017 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Tue May 16 11:43:04 2017 -0700

----------------------------------------------------------------------
 hbase-native-client/core/BUCK                   |   2 +
 hbase-native-client/core/action.h               |  13 +-
 .../core/async-batch-rpc-retrying-caller.cc     | 501 +++++++++++++++++++
 .../core/async-batch-rpc-retrying-caller.h      | 183 +++++++
 .../core/async-rpc-retrying-caller-factory.h    |  84 +++-
 .../core/async-rpc-retrying-caller.cc           |   4 +-
 .../core/async-rpc-retrying-test.cc             |   4 +-
 hbase-native-client/core/client-test.cc         |  53 ++
 hbase-native-client/core/raw-async-table.cc     |  22 +-
 hbase-native-client/core/raw-async-table.h      |   9 +-
 hbase-native-client/core/request-converter.cc   |  13 +-
 hbase-native-client/core/simple-client.cc       |  22 +-
 hbase-native-client/core/table.cc               |  18 +-
 hbase-native-client/core/table.h                |   3 +-
 14 files changed, 897 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 412ee3b..e9fc716 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -53,6 +53,7 @@ cxx_library(
         "region-result.h",
         "row.h",
         "server-request.h",
+        "async-batch-rpc-retrying-caller.h",
     ],
     srcs=[
         "async-connection.cc",
@@ -77,6 +78,7 @@ cxx_library(
         "zk-util.cc",
         "multi-response.cc",
         "region-result.cc",
+        "async-batch-rpc-retrying-caller.cc",
     ],
     deps=[
         "//exceptions:exceptions",

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/action.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h
index 3511683..21a0181 100644
--- a/hbase-native-client/core/action.h
+++ b/hbase-native-client/core/action.h
@@ -20,24 +20,23 @@
 #pragma once
 
 #include <memory>
-#include "core/row.h"
+#include "core/get.h"
 
-using hbase::Row;
 namespace hbase {
 
 class Action {
  public:
-  Action(std::shared_ptr<Row> action, int original_index)
+  Action(std::shared_ptr<hbase::Get> action, int32_t original_index)
       : action_(action), original_index_(original_index) {}
   ~Action() {}
 
-  int64_t original_index() const { return original_index_; }
+  int32_t original_index() const { return original_index_; }
 
-  std::shared_ptr<Row> action() const { return action_; }
+  std::shared_ptr<hbase::Get> action() const { return action_; }
 
  private:
-  std::shared_ptr<Row> action_;
-  int64_t original_index_;
+  std::shared_ptr<hbase::Get> action_;
+  int32_t original_index_;
   int64_t nonce_ = -1;
   int32_t replica_id_ = -1;
 };

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
new file mode 100644
index 0000000..f3be637
--- /dev/null
+++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-batch-rpc-retrying-caller.h"
+#include <glog/logging.h>
+#include <limits>
+
+using folly::Future;
+using folly::Promise;
+using folly::Try;
+
+using folly::Future;
+using folly::Promise;
+using folly::Try;
+using hbase::Action;
+using hbase::LocationCache;
+using hbase::MultiResponse;
+using hbase::RegionLocation;
+using hbase::RegionRequest;
+using hbase::RequestConverter;
+using hbase::Result;
+using hbase::RpcClient;
+using hbase::ServerRequest;
+using hbase::pb::ServerName;
+using hbase::pb::TableName;
+using hbase::security::User;
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+using wangle::CPUThreadPoolExecutor;
+
+namespace hbase {
+
+AsyncBatchRpcRetryingCaller::AsyncBatchRpcRetryingCaller(
+    std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
+    std::shared_ptr<TableName> table_name, const std::vector<hbase::Get> &actions,
+    nanoseconds pause_ns, int32_t max_attempts, nanoseconds operation_timeout_ns,
+    nanoseconds rpc_timeout_ns, int32_t start_log_errors_count)
+    : conn_(conn),
+      retry_timer_(retry_timer),
+      table_name_(table_name),
+      pause_ns_(pause_ns),
+      operation_timeout_ns_(operation_timeout_ns),
+      rpc_timeout_ns_(rpc_timeout_ns),
+      start_log_errors_count_(start_log_errors_count) {
+  CHECK(conn_ != nullptr);
+  CHECK(retry_timer_ != nullptr);
+  location_cache_ = conn_->region_locator();
+  rpc_client_ = conn_->rpc_client();
+  cpu_pool_ = conn_->cpu_executor();
+  CHECK(location_cache_ != nullptr);
+  CHECK(rpc_client_ != nullptr);
+  CHECK(cpu_pool_ != nullptr);
+
+  max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts);
+  uint32_t index = 0;
+  for (auto row : actions) {
+    actions_.push_back(std::make_shared<Action>(std::make_shared<hbase::Get>(row), index));
+    Promise<std::shared_ptr<Result>> prom{};
+    action2promises_.insert(
+        std::pair<uint64_t, Promise<std::shared_ptr<Result>>>(index, std::move(prom)));
+    action2futures_.push_back(action2promises_[index++].getFuture());
+  }
+}
+
+AsyncBatchRpcRetryingCaller::~AsyncBatchRpcRetryingCaller() {}
+
+Future<std::vector<Try<std::shared_ptr<Result>>>> AsyncBatchRpcRetryingCaller::Call() {
+  GroupAndSend(actions_, 1);
+  return collectAll(action2futures_);
+}
+
+int64_t AsyncBatchRpcRetryingCaller::RemainingTimeNs() {
+  return operation_timeout_ns_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+}
+
+void AsyncBatchRpcRetryingCaller::LogException(int32_t tries,
+                                               std::shared_ptr<RegionRequest> region_request,
+                                               std::shared_ptr<std::exception> &error,
+                                               std::shared_ptr<ServerName> server_name) {
+  if (tries > start_log_errors_count_) {
+    std::string regions;
+    regions += region_request->region_location()->region_name() + ", ";
+    LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
+                 << table_name_->qualifier() << " from " << server_name->host_name()
+                 << " failed, tries=" << tries << ":- " << error->what();
+  }
+}
+
+void AsyncBatchRpcRetryingCaller::LogException(
+    int32_t tries, std::vector<std::shared_ptr<RegionRequest>> &region_requests,
+    std::shared_ptr<std::exception> &error, std::shared_ptr<ServerName> server_name) {
+  if (tries > start_log_errors_count_) {
+    std::string regions;
+    for (const auto region_request : region_requests) {
+      regions += region_request->region_location()->region_name() + ", ";
+    }
+    LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
+                 << table_name_->qualifier() << " from " << server_name->host_name()
+                 << " failed, tries=" << tries << error->what();
+  }
+}
+
+const std::string AsyncBatchRpcRetryingCaller::GetExtraContextForError(
+    std::shared_ptr<ServerName> server_name) {
+  return server_name ? server_name->ShortDebugString() : "";
+}
+
+// TODO HBASE-17800 pass folly ew instead of std::exception
+void AsyncBatchRpcRetryingCaller::AddError(const std::shared_ptr<Action> &action,
+                                           std::shared_ptr<std::exception> error,
+                                           std::shared_ptr<ServerName> server_name) {
+  folly::exception_wrapper ew;
+  ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+  AddAction2Error(action->original_index(), twec);
+}
+
+void AsyncBatchRpcRetryingCaller::AddError(const std::vector<std::shared_ptr<Action>> &actions,
+                                           std::shared_ptr<std::exception> error,
+                                           std::shared_ptr<ServerName> server_name) {
+  for (const auto action : actions) {
+    AddError(action, error, server_name);
+  }
+}
+
+// TODO HBASE-17800 pass folly ew instead of std::exception
+void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action, int32_t tries,
+                                          std::shared_ptr<std::exception> error,
+                                          int64_t current_time, const std::string extras) {
+  auto action_index = action->original_index();
+  auto itr = action2promises_.find(action_index);
+  if (itr != action2promises_.end()) {
+    if (itr->second.isFulfilled()) {
+      return;
+    }
+  }
+  folly::exception_wrapper ew;
+  ThrowableWithExtraContext twec(ew, current_time, extras);
+  AddAction2Error(action_index, twec);
+  action2promises_[action_index].setException(
+      RetriesExhaustedException(tries - 1, action2errors_[action_index]));
+}
+
+void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions,
+                                          int32_t tries, std::shared_ptr<std::exception> error,
+                                          std::shared_ptr<ServerName> server_name) {
+  for (const auto action : actions) {
+    FailOne(action, tries, error, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+  }
+}
+
+void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions,
+                                          int32_t tries) {
+  for (const auto action : actions) {
+    auto action_index = action->original_index();
+    auto itr = action2promises_.find(action_index);
+    if (itr->second.isFulfilled()) {
+      return;
+    }
+    action2promises_[action_index].setException(
+        RetriesExhaustedException(tries - 1, action2errors_[action_index]));
+  }
+}
+
+void AsyncBatchRpcRetryingCaller::AddAction2Error(uint64_t action_index,
+                                                  const ThrowableWithExtraContext &twec) {
+  auto erritr = action2errors_.find(action_index);
+  if (erritr != action2errors_.end()) {
+    erritr->second->push_back(twec);
+  } else {
+    action2errors_[action_index] = std::make_shared<std::vector<ThrowableWithExtraContext>>();
+    action2errors_[action_index]->push_back(twec);
+  }
+  return;
+}
+
+void AsyncBatchRpcRetryingCaller::OnError(const ActionsByRegion &actions_by_region, int32_t tries,
+                                          std::shared_ptr<std::exception> exc,
+                                          std::shared_ptr<ServerName> server_name) {
+  std::vector<std::shared_ptr<Action>> copied_actions;
+  std::vector<std::shared_ptr<RegionRequest>> region_requests;
+  for (const auto &action_by_region : actions_by_region) {
+    region_requests.push_back(action_by_region.second);
+    // Concurrent
+    for (const auto &action : action_by_region.second->actions()) {
+      copied_actions.push_back(action);
+    }
+  }
+  // TODO HBASE-17800 for exc check with DoNotRetryIOException
+  LogException(tries, region_requests, exc, server_name);
+  if (tries >= max_attempts_) {
+    FailAll(copied_actions, tries, exc, server_name);
+    return;
+  }
+  AddError(copied_actions, exc, server_name);
+  TryResubmit(copied_actions, tries);
+}
+
+void AsyncBatchRpcRetryingCaller::TryResubmit(std::vector<std::shared_ptr<Action>> actions,
+                                              int32_t tries) {
+  int64_t delay_ns;
+  if (operation_timeout_ns_.count() > 0) {
+    int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
+    if (max_delay_ns <= 0) {
+      VLOG(8) << "Fail All from onError";
+      FailAll(actions, tries);
+      return;
+    }
+    delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1));
+  } else {
+    delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1);
+  }
+  // TODO This gives segfault @ present, when retried
+  //    retry_timer_->scheduleTimeoutFn([&]() { GroupAndSend(actions, tries + 1); },
+  //                                  milliseconds(TimeUtil::ToMillis(delay_ns)));
+}
+
+Future<std::vector<Try<std::shared_ptr<RegionLocation>>>>
+AsyncBatchRpcRetryingCaller::GetRegionLocations(const std::vector<std::shared_ptr<Action>> &actions,
+                                                int64_t locate_timeout_ns) {
+  auto locs = std::vector<Future<std::shared_ptr<RegionLocation>>>{};
+  for (auto const &action : actions) {
+    locs.push_back(location_cache_->LocateRegion(*table_name_, action->action()->row(),
+                                                 RegionLocateType::kCurrent, locate_timeout_ns));
+  }
+
+  return collectAll(locs);
+}
+
+void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr<Action>> &actions,
+                                               int32_t tries) {
+  int64_t locate_timeout_ns;
+  if (operation_timeout_ns_.count() > 0) {
+    locate_timeout_ns = RemainingTimeNs();
+    if (locate_timeout_ns <= 0) {
+      FailAll(actions_, tries);
+      return;
+    }
+  } else {
+    locate_timeout_ns = -1L;
+  }
+
+  GetRegionLocations(actions, locate_timeout_ns)
+      .then([&](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) {
+        std::lock_guard<std::mutex> lock(multi_mutex_);
+        ActionsByServer actions_by_server;
+        std::vector<std::shared_ptr<Action>> locate_failed;
+
+        for (uint64_t i = 0; i < loc.size(); ++i) {
+          auto action = actions[i];
+          if (loc[i].hasValue()) {
+            auto region_loc = loc[i].value();
+            // Add it to actions_by_server;
+            // Concurrent
+            auto search =
+                actions_by_server.find(std::make_shared<ServerName>(region_loc->server_name()));
+            if (search != actions_by_server.end()) {
+              search->second->AddActionsByRegion(region_loc, action);
+            } else {
+              // Create new key
+              auto server_request = std::make_shared<ServerRequest>(region_loc);
+              server_request->AddActionsByRegion(region_loc, action);
+              auto server_name = std::make_shared<ServerName>(region_loc->server_name());
+              actions_by_server[server_name] = server_request;
+            }
+            locate_failed.push_back(action);
+            VLOG(8) << "row [" << action->action()->row() << "] of table["
+                    << table_name_->namespace_() << ":" << table_name_->qualifier()
+                    << " found in region [" << region_loc->region_name() << "]; host["
+                    << region_loc->server_name().host_name() << "]; port["
+                    << region_loc->server_name().port() << "];";
+          } else if (loc[i].hasException()) {
+            VLOG(8) << "Exception occured while locating region:- "
+                    << loc[i].exception().getCopied()->what() << " for action index " << i;
+            // TODO Feedback needed, Java API only identifies DoNotRetryIOException
+            // We might receive runtime error from location-cache.cc too, we are treating both same
+            if (loc[i].exception().is_compatible_with<std::runtime_error>()) {
+              std::string extra = "";
+              FailOne(action, tries, nullptr, TimeUtil::GetNowNanos(),
+                      loc[i].exception().what().toStdString());
+              return;
+            }
+            // TODO HBASE-17800 for exc check with DoNotRetryIOException
+            /*
+             else if (loc[i].exception().is_compatible_with<hbase::DoNotRetryIOException>()) {
+             int64_t current_time = 0;
+             std::string extra = "";
+             FailOne(action, tries, nullptr, TimeUtil::GetNowNanos(),
+             loc[i].exception().what().toStdString());
+             return;
+             }*/
+            AddError(action, std::make_shared<std::exception>(*loc[i].exception().getCopied()),
+                     nullptr);
+            locate_failed.push_back(action);
+          }
+        }
+
+        if (!actions_by_server.empty()) {
+          Send(actions_by_server, tries);
+        }
+
+        if (!locate_failed.empty()) {
+          TryResubmit(locate_failed, tries);
+        }
+      })
+      .onError([&](const folly::exception_wrapper &ew) {
+        std::lock_guard<std::mutex> lock(multi_mutex_);
+        auto exc = ew.getCopied();
+        VLOG(8) << "GetRegionLocations() exception: " << ew.what().toStdString();
+      });
+  return;
+}
+
+Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller::GetMultiResponse(
+    const ActionsByServer &actions_by_server) {
+  // Concurrent.
+  auto multi_calls = std::vector<Future<std::unique_ptr<hbase::Response>>>{};
+  auto user = User::defaultUser();
+  for (const auto &action_by_server : actions_by_server) {
+    std::unique_ptr<Request> multi_req =
+        RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region());
+    auto host = action_by_server.first->host_name();
+    int port = action_by_server.first->port();
+    multi_calls.push_back(
+        rpc_client_->AsyncCall(host, port, std::move(multi_req), user, "ClientService"));
+  }
+  return collectAll(multi_calls);
+}
+
+void AsyncBatchRpcRetryingCaller::Send(ActionsByServer &actions_by_server, int32_t tries) {
+  int64_t remaining_ns;
+  if (operation_timeout_ns_.count() > 0) {
+    remaining_ns = RemainingTimeNs();
+    if (remaining_ns <= 0) {
+      std::vector<std::shared_ptr<Action>> failed_actions;
+      for (const auto &action_by_server : actions_by_server) {
+        // Concurrent
+        for (auto &value : action_by_server.second->actions_by_region()) {
+          // Concurrent
+          for (const auto &failed_action : value.second->actions()) {
+            failed_actions.push_back(failed_action);
+          }
+        }
+      }
+      FailAll(failed_actions, tries);
+      return;
+    }
+  } else {
+    remaining_ns = std::numeric_limits<int64_t>::max();
+  }
+
+  std::vector<std::shared_ptr<Request>> multi_reqv;
+  for (const auto &action_by_server : actions_by_server)
+    multi_reqv.push_back(
+        std::move(RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region())));
+
+  GetMultiResponse(actions_by_server)
+      .then([=](const std::vector<Try<std::unique_ptr<hbase::Response>>> &completed_responses) {
+        std::lock_guard<std::mutex> lock(multi_mutex_);
+        for (uint64_t num = 0; num < completed_responses.size(); ++num) {
+          if (completed_responses[num].hasValue()) {
+            auto multi_response =
+                ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value());
+            for (const auto &action_by_server : actions_by_server) {
+              OnComplete(action_by_server.second->actions_by_region(), tries,
+                         action_by_server.first, std::move(multi_response));
+            }
+          } else if (completed_responses[num].hasException()) {
+            VLOG(8) << "Received exception: "
+                    << completed_responses[num].exception().getCopied()->what()
+                    << " from server for action index " << num;
+            // TODO: we should call OnError here as well.
+          }
+        }
+      })
+      .onError([=](const folly::exception_wrapper &ew) {
+        auto exc = ew.getCopied();
+        VLOG(8) << "GetMultiResponse() exception: " << ew.what().toStdString();
+        std::lock_guard<std::mutex> lock(multi_mutex_);
+        for (const auto &action_by_server : actions_by_server) {
+          OnError(action_by_server.second->actions_by_region(), tries,
+                  std::make_shared<std::exception>(*exc), action_by_server.first);
+        }
+      });
+  return;
+}
+
+void AsyncBatchRpcRetryingCaller::OnComplete(
+    const ActionsByRegion &actions_by_region, int32_t tries,
+    const std::shared_ptr<ServerName> server_name,
+    const std::unique_ptr<hbase::MultiResponse> multi_response) {
+  std::vector<std::shared_ptr<Action>> failed_actions;
+  for (const auto &action_by_region : actions_by_region) {
+    auto region_result_itr = multi_response->RegionResults().find(action_by_region.first);
+    if (region_result_itr == multi_response->RegionResults().end()) {
+      VLOG(8) << "Region " << action_by_region.first << " not found in MultiResults.";
+      // TODO Feedback needed Should we throw from here or continue for next action_by_region ?
+      // Throwing at present as this looks like an inconsistency
+      // Concurrent
+      auto exc = std::make_shared<std::runtime_error>("Invalid search for region " +
+                                                      action_by_region.first + " in multi results");
+      FailAll(action_by_region.second->actions(), tries, exc, server_name);
+      return;
+      // std::runtime_error(
+      //  "Invalid search for region " + action_by_region.first + " in multi results");
+    }
+    if (region_result_itr != multi_response->RegionResults().end()) {
+      // Concurrent
+      for (const auto &action : action_by_region.second->actions()) {
+        OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second,
+                   failed_actions);
+      }
+    } else {
+      auto region_exc = multi_response->RegionException(action_by_region.first);
+      std::shared_ptr<std::exception> pexc;
+      if (region_exc == nullptr) {
+        VLOG(8) << "Server sent us neither results nor exceptions for " << action_by_region.first;
+        pexc = std::make_shared<std::exception>(std::runtime_error("Invalid response"));
+        // TODO: raise this exception to the application
+      } else {
+        // TODO HBASE-17800 for exc check with DoNotRetryIOException
+        LogException(tries, action_by_region.second, region_exc, server_name);
+        location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(),
+                                              *region_exc);
+        std::string row_name;
+        if (tries >= max_attempts_) {
+          // Concurrent
+          FailAll(action_by_region.second->actions(), tries, region_exc, server_name);
+          return;
+        }
+        // Concurrent
+        AddError(action_by_region.second->actions(), region_exc, server_name);
+        for (const auto &action : action_by_region.second->actions()) {
+          failed_actions.push_back(action);
+        }
+      }
+    }
+  }
+  if (!failed_actions.empty()) {
+    TryResubmit(failed_actions, tries);
+  }
+  return;
+}
+
+void AsyncBatchRpcRetryingCaller::OnComplete(const std::shared_ptr<Action> &action,
+                                             const std::shared_ptr<RegionRequest> &region_request,
+                                             int32_t tries,
+                                             const std::shared_ptr<ServerName> &server_name,
+                                             const std::shared_ptr<RegionResult> &region_result,
+                                             std::vector<std::shared_ptr<Action>> &failed_actions) {
+  std::string err_msg;
+  try {
+    auto result_or_exc = region_result->ResultOrException(action->original_index());
+    auto result = std::get<0>(*result_or_exc);
+    auto exc = std::get<1>(*result_or_exc);
+    std::shared_ptr<std::exception> pexc;
+    if (exc != nullptr) {
+      LogException(tries, region_request, exc, server_name);
+      if (tries >= max_attempts_) {
+        FailOne(action, tries, exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+      } else {
+        failed_actions.push_back(action);
+      }
+    } else if (result != nullptr) {
+      action2promises_[action->original_index()].setValue(std::move(result));
+    } else {
+      VLOG(8) << "Server " << server_name->ShortDebugString()
+              << " sent us neither results nor exceptions for request @ index "
+              << action->original_index() << ", row " << action->action()->row() << " of "
+              << region_request->region_location()->region_name();
+      err_msg = "Invalid response";
+      AddError(action, std::make_shared<std::runtime_error>(err_msg), server_name);
+      failed_actions.push_back(action);
+    }
+  } catch (const std::out_of_range &oor) {
+    // TODO Feedback needed. Should we retry for he specific index again ?
+    // This should never occur, so we are throwing a std::runtime_error from here
+    VLOG(8) << "No ResultOrException found @ index " << action->original_index() << ", row "
+            << action->action()->row() << " of "
+            << region_request->region_location()->region_name();
+    throw std::runtime_error("ResultOrException not present @ index " +
+                             std::to_string(action->original_index()));
+  }
+  return;
+}
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/async-batch-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
new file mode 100644
index 0000000..6803a0e
--- /dev/null
+++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <folly/Format.h>
+#include <folly/futures/Future.h>
+#include <folly/futures/Promise.h>
+#include <folly/futures/Try.h>
+#include <folly/io/IOBuf.h>
+#include <folly/io/async/HHWheelTimer.h>
+#include <wangle/concurrent/CPUThreadPoolExecutor.h>
+
+#include <algorithm>
+#include <chrono>
+#include <functional>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <stdexcept>
+#include <string>
+#include <tuple>
+#include <type_traits>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "connection/rpc-client.h"
+#include "core/action.h"
+#include "core/async-connection.h"
+#include "core/location-cache.h"
+#include "core/multi-response.h"
+#include "core/region-location.h"
+#include "core/region-request.h"
+#include "core/region-result.h"
+#include "core/request-converter.h"
+#include "core/response-converter.h"
+#include "core/result.h"
+#include "core/row.h"
+#include "core/server-request.h"
+#include "exceptions/exception.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+#include "security/user.h"
+#include "utils/connection-util.h"
+#include "utils/sys-util.h"
+#include "utils/time-util.h"
+
+namespace hbase {
+/* Equals function for ServerName */
+struct ServerNameEquals {
+  bool operator()(const std::shared_ptr<ServerName> &lhs,
+                  const std::shared_ptr<ServerName> &rhs) const {
+    return (lhs->start_code() == rhs->start_code() && lhs->host_name() == rhs->host_name() &&
+            lhs->port() == rhs->port());
+  }
+};
+
+struct ServerNameHash {
+  /** hash */
+  std::size_t operator()(const std::shared_ptr<ServerName> &sn) const {
+    std::size_t h = 0;
+    boost::hash_combine(h, sn->start_code());
+    boost::hash_combine(h, sn->host_name());
+    boost::hash_combine(h, sn->port());
+    return h;
+  }
+};
+
+class AsyncBatchRpcRetryingCaller {
+ public:
+  using ActionsByServer =
+      std::unordered_map<std::shared_ptr<ServerName>, std::shared_ptr<ServerRequest>,
+                         ServerNameHash, ServerNameEquals>;
+  using ActionsByRegion = ServerRequest::ActionsByRegion;
+
+  AsyncBatchRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
+                              std::shared_ptr<folly::HHWheelTimer> retry_timer,
+                              std::shared_ptr<pb::TableName> table_name,
+                              const std::vector<hbase::Get> &actions, nanoseconds pause_ns,
+                              int32_t max_attempts, nanoseconds operation_timeout_ns,
+                              nanoseconds rpc_timeout_ns, int32_t start_log_errors_count);
+
+  ~AsyncBatchRpcRetryingCaller();
+
+  folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Call();
+
+ private:
+  int64_t RemainingTimeNs();
+
+  void LogException(int32_t tries, std::shared_ptr<RegionRequest> region_request,
+                    std::shared_ptr<std::exception> &error,
+                    std::shared_ptr<ServerName> server_name);
+
+  void LogException(int32_t tries, std::vector<std::shared_ptr<RegionRequest>> &region_requests,
+                    std::shared_ptr<std::exception> &error,
+                    std::shared_ptr<ServerName> server_name);
+
+  const std::string GetExtraContextForError(std::shared_ptr<ServerName> server_name);
+
+  void AddError(const std::shared_ptr<Action> &action, std::shared_ptr<std::exception> error,
+                std::shared_ptr<ServerName> server_name);
+
+  void AddError(const std::vector<std::shared_ptr<Action>> &actions,
+                std::shared_ptr<std::exception> error, std::shared_ptr<ServerName> server_name);
+
+  void FailOne(const std::shared_ptr<Action> &action, int32_t tries,
+               std::shared_ptr<std::exception> error, int64_t current_time,
+               const std::string extras);
+
+  void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries,
+               std::shared_ptr<std::exception> error, std::shared_ptr<ServerName> server_name);
+
+  void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
+
+  void AddAction2Error(uint64_t action_index, const ThrowableWithExtraContext &twec);
+
+  void OnError(const ActionsByRegion &actions_by_region, int32_t tries,
+               std::shared_ptr<std::exception> exc, std::shared_ptr<ServerName> server_name);
+
+  void TryResubmit(std::vector<std::shared_ptr<Action>> actions, int32_t tries);
+
+  folly::Future<std::vector<folly::Try<std::shared_ptr<RegionLocation>>>> GetRegionLocations(
+      const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns);
+
+  void GroupAndSend(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
+
+  folly::Future<std::vector<folly::Try<std::unique_ptr<Response>>>> GetMultiResponse(
+      const ActionsByServer &actions_by_server);
+
+  void Send(ActionsByServer &actions_by_server, int32_t tries);
+
+  void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries,
+                  const std::shared_ptr<ServerName> server_name,
+                  const std::unique_ptr<MultiResponse> multi_results);
+
+  void OnComplete(const std::shared_ptr<Action> &action,
+                  const std::shared_ptr<RegionRequest> &region_request, int32_t tries,
+                  const std::shared_ptr<ServerName> &server_name,
+                  const std::shared_ptr<RegionResult> &region_result,
+                  std::vector<std::shared_ptr<Action>> &failed_actions);
+
+ private:
+  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+  std::shared_ptr<hbase::AsyncConnection> conn_;
+  std::shared_ptr<pb::TableName> table_name_;
+  std::vector<std::shared_ptr<Action>> actions_;
+  nanoseconds pause_ns_;
+  int32_t max_attempts_ = 0;
+  nanoseconds operation_timeout_ns_;
+  nanoseconds rpc_timeout_ns_;
+  int32_t start_log_errors_count_ = 0;
+
+  int64_t start_ns_ = TimeUtil::GetNowNanos();
+  int32_t tries_ = 1;
+  std::map<uint64_t, folly::Promise<std::shared_ptr<Result>>> action2promises_;
+  std::vector<folly::Future<std::shared_ptr<Result>>> action2futures_;
+  std::map<uint64_t, std::shared_ptr<std::vector<ThrowableWithExtraContext>>> action2errors_;
+
+  std::shared_ptr<AsyncRegionLocator> location_cache_ = nullptr;
+  std::shared_ptr<RpcClient> rpc_client_ = nullptr;
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_pool_ = nullptr;
+
+  std::mutex multi_mutex_;
+};
+
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/async-rpc-retrying-caller-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
index 5a80a06..f1ffdac 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
@@ -19,20 +19,19 @@
 #pragma once
 
 #include <folly/Logging.h>
-#include <folly/io/IOBuf.h>
 #include <folly/io/async/EventBase.h>
 #include <chrono>
 #include <memory>
 #include <string>
+#include <vector>
 
 #include "connection/rpc-client.h"
+#include "core/async-batch-rpc-retrying-caller.h"
 #include "core/async-rpc-retrying-caller.h"
+#include "core/row.h"
 #include "if/Client.pb.h"
 #include "if/HBase.pb.h"
 
-using hbase::pb::TableName;
-using std::chrono::nanoseconds;
-
 namespace hbase {
 
 class AsyncConnection;
@@ -58,7 +57,7 @@ class SingleRequestCallerBuilder
   typedef SingleRequestCallerBuilder<RESP> GenenericThisType;
   typedef std::shared_ptr<GenenericThisType> SharedThisPtr;
 
-  SharedThisPtr table(std::shared_ptr<TableName> table_name) {
+  SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) {
     table_name_ = table_name;
     return shared_this();
   }
@@ -119,7 +118,7 @@ class SingleRequestCallerBuilder
  private:
   std::shared_ptr<AsyncConnection> conn_;
   std::shared_ptr<folly::HHWheelTimer> retry_timer_;
-  std::shared_ptr<TableName> table_name_;
+  std::shared_ptr<pb::TableName> table_name_;
   nanoseconds rpc_timeout_nanos_;
   nanoseconds operation_timeout_nanos_;
   nanoseconds pause_;
@@ -130,6 +129,75 @@ class SingleRequestCallerBuilder
   Callable<RESP> callable_;
 };  // end of SingleRequestCallerBuilder
 
+class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilder> {
+ public:
+  explicit BatchCallerBuilder(std::shared_ptr<AsyncConnection> conn,
+                              std::shared_ptr<folly::HHWheelTimer> retry_timer)
+      : conn_(conn), retry_timer_(retry_timer) {}
+
+  virtual ~BatchCallerBuilder() = default;
+
+  typedef std::shared_ptr<BatchCallerBuilder> SharedThisPtr;
+
+  SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) {
+    table_name_ = table_name;
+    return shared_this();
+  }
+
+  SharedThisPtr actions(std::shared_ptr<std::vector<hbase::Get>> actions) {
+    actions_ = actions;
+    return shared_this();
+  }
+
+  SharedThisPtr operation_timeout(nanoseconds operation_timeout_nanos) {
+    operation_timeout_nanos_ = operation_timeout_nanos;
+    return shared_this();
+  }
+
+  SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) {
+    rpc_timeout_nanos_ = rpc_timeout_nanos;
+    return shared_this();
+  }
+
+  SharedThisPtr pause(nanoseconds pause_ns) {
+    pause_ns_ = pause_ns;
+    return shared_this();
+  }
+
+  SharedThisPtr max_attempts(int32_t max_attempts) {
+    max_attempts_ = max_attempts;
+    return shared_this();
+  }
+
+  SharedThisPtr start_log_errors_count(int32_t start_log_errors_count) {
+    start_log_errors_count_ = start_log_errors_count;
+    return shared_this();
+  }
+
+  folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Call() { return Build()->Call(); }
+
+  std::shared_ptr<AsyncBatchRpcRetryingCaller> Build() {
+    return std::make_shared<AsyncBatchRpcRetryingCaller>(
+        conn_, retry_timer_, table_name_, *actions_, pause_ns_, max_attempts_,
+        operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_);
+  }
+
+ private:
+  SharedThisPtr shared_this() {
+    return std::enable_shared_from_this<BatchCallerBuilder>::shared_from_this();
+  }
+
+ private:
+  std::shared_ptr<AsyncConnection> conn_;
+  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+  std::shared_ptr<hbase::pb::TableName> table_name_ = nullptr;
+  std::shared_ptr<std::vector<hbase::Get>> actions_ = nullptr;
+  nanoseconds pause_ns_;
+  int32_t max_attempts_ = 0;
+  nanoseconds operation_timeout_nanos_;
+  nanoseconds rpc_timeout_nanos_;
+  int32_t start_log_errors_count_ = 0;
+};
 class AsyncRpcRetryingCallerFactory {
  private:
   std::shared_ptr<AsyncConnection> conn_;
@@ -146,6 +214,10 @@ class AsyncRpcRetryingCallerFactory {
   std::shared_ptr<SingleRequestCallerBuilder<RESP>> Single() {
     return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_, retry_timer_);
   }
+
+  std::shared_ptr<BatchCallerBuilder> Batch() {
+    return std::make_shared<BatchCallerBuilder>(conn_, retry_timer_);
+  }
 };
 
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/async-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
index 7e211f7..f8b237b 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -143,9 +143,7 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
    */
   conn_->retry_executor()->add([&]() {
     retry_timer_->scheduleTimeoutFn(
-        [this]() {
-          conn_->cpu_executor()->add([&]() { LocateThenCall(); });
-        },
+        [this]() { conn_->cpu_executor()->add([&]() { LocateThenCall(); }); },
         milliseconds(TimeUtil::ToMillis(delay_ns)));
   });
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/async-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
index ff28e79..487c34c 100644
--- a/hbase-native-client/core/async-rpc-retrying-test.cc
+++ b/hbase-native-client/core/async-rpc-retrying-test.cc
@@ -159,9 +159,9 @@ class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
   uint32_t num_fails_ = 0;
 
  public:
-  explicit  MockFailingAsyncRegionLocator(uint32_t num_fails)
+  explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
       : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
-  explicit  MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+  explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
       : AsyncRegionLocatorBase(region_location) {}
   virtual ~MockFailingAsyncRegionLocator() {}
   folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/client-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index 274168f..1c6ec4a 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -234,3 +234,56 @@ TEST_F(ClientTest, PutsWithTimestamp) {
   table->Close();
   client.Close();
 }
+
+TEST_F(ClientTest, MultiGets) {
+  // Using TestUtil to populate test data
+  ClientTest::test_util->CreateTable("t", "d");
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>("t");
+
+  // Create a client
+  hbase::Client client(*ClientTest::test_util->conf());
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  uint64_t num_rows = 10000;
+  // Perform Puts
+  for (uint64_t i = 0; i < num_rows; i++) {
+    table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
+                                                         "value" + std::to_string(i)));
+  }
+
+  // Perform the Gets
+  std::vector<hbase::Get> gets;
+  for (uint64_t i = 0; i < num_rows; ++i) {
+    auto row = "test" + std::to_string(i);
+    hbase::Get get(row);
+    gets.push_back(get);
+  }
+  gets.push_back(hbase::Get("test2"));
+  gets.push_back(hbase::Get("testextra"));
+
+  auto results = table->Get(gets);
+
+  // Test the values, should be same as in put executed on hbase shell
+  ASSERT_TRUE(!results.empty()) << "Result vector shouldn't be empty.";
+
+  uint32_t i = 0;
+  for (; i < num_rows; ++i) {
+    ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row()
+                                        << " must not be empty";
+    EXPECT_EQ("test" + std::to_string(i), results[i]->Row());
+    EXPECT_EQ("value" + std::to_string(i), *results[i]->Value("d", std::to_string(i)).get());
+  }
+  // We are inserting test2 twice so the below test should pass
+  ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row() << " must not be empty";
+
+  ++i;
+  ASSERT_TRUE(results[i]->IsEmpty()) << "Result for Get " << gets[i].row() << " must be empty";
+
+  table->Close();
+  client.Close();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/raw-async-table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
index 2bc9f36..9e0d4a3 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -91,4 +91,24 @@ Future<Unit> RawAsyncTable::Put(const hbase::Put& put) {
   return caller->Call().then([caller](const auto r) { return r; });
 }
 
-} /* namespace hbase */
+Future<std::vector<Try<std::shared_ptr<Result>>>> RawAsyncTable::Get(
+    const std::vector<hbase::Get>& gets) {
+  return this->Batch(gets);
+}
+
+Future<std::vector<Try<std::shared_ptr<Result>>>> RawAsyncTable::Batch(
+    const std::vector<hbase::Get>& gets) {
+  auto caller = connection_->caller_factory()
+                    ->Batch()
+                    ->table(table_name_)
+                    ->actions(std::make_shared<std::vector<hbase::Get>>(gets))
+                    ->rpc_timeout(connection_conf_->read_rpc_timeout())
+                    ->operation_timeout(connection_conf_->operation_timeout())
+                    ->pause(connection_conf_->pause())
+                    ->max_attempts(connection_conf_->max_retries())
+                    ->start_log_errors_count(connection_conf_->start_log_errors_count())
+                    ->Build();
+
+  return caller->Call().then([caller](auto r) { return r; });
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/raw-async-table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h
index 978a2b8..e26d46e 100644
--- a/hbase-native-client/core/raw-async-table.h
+++ b/hbase-native-client/core/raw-async-table.h
@@ -20,11 +20,11 @@
 
 #include <folly/futures/Future.h>
 #include <folly/futures/Unit.h>
-
 #include <chrono>
 #include <memory>
 #include <string>
-
+#include <vector>
+#include "core/async-batch-rpc-retrying-caller.h"
 #include "core/async-connection.h"
 #include "core/async-rpc-retrying-caller-factory.h"
 #include "core/async-rpc-retrying-caller.h"
@@ -34,6 +34,7 @@
 #include "core/result.h"
 
 using folly::Future;
+using folly::Try;
 using folly::Unit;
 using hbase::pb::TableName;
 using std::chrono::nanoseconds;
@@ -59,6 +60,9 @@ class RawAsyncTable {
   Future<Unit> Put(const hbase::Put& put);
   void Close() {}
 
+  Future<std::vector<Try<std::shared_ptr<Result>>>> Get(const std::vector<hbase::Get>& gets);
+  Future<std::vector<Try<std::shared_ptr<Result>>>> Batch(const std::vector<hbase::Get>& gets);
+
  private:
   /* Data */
   std::shared_ptr<AsyncConnection> connection_;
@@ -78,5 +82,4 @@ class RawAsyncTable {
   std::shared_ptr<SingleRequestCallerBuilder<RESP>> CreateCallerBuilder(std::string row,
                                                                         nanoseconds rpc_timeout);
 };
-
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/request-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc
index 4c12ee7..c90e1ab 100644
--- a/hbase-native-client/core/request-converter.cc
+++ b/hbase-native-client/core/request-converter.cc
@@ -50,7 +50,6 @@ std::unique_ptr<Request> RequestConverter::ToGetRequest(const Get &get,
   auto pb_msg = std::static_pointer_cast<GetRequest>(pb_req->req_msg());
   RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
   pb_msg->set_allocated_get((RequestConverter::ToGet(get)).release());
-
   return pb_req;
 }
 
@@ -114,12 +113,12 @@ std::unique_ptr<Request> RequestConverter::ToMultiRequest(
     int action_num = 0;
     for (const auto &region_action : action_by_region.second->actions()) {
       auto pb_action = pb_region_action->add_action();
-      auto action = region_action->action();
-      if (auto pget = std::dynamic_pointer_cast<Get>(action)) {
-        auto pb_get = RequestConverter::ToGet(*pget.get());
-        pb_action->set_allocated_get(pb_get.release());
-        pb_action->set_index(action_num);
-      }
+      auto pget = region_action->action();
+      // We store only hbase::Get in hbase::Action as of now. It will be changed later on.
+      CHECK(pget) << "Unexpected. action can't be null";
+      auto pb_get = RequestConverter::ToGet(*pget);
+      pb_action->set_allocated_get(pb_get.release());
+      pb_action->set_index(action_num);
       action_num++;
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index b417353..3a7d62b 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -26,6 +26,7 @@
 #include <iostream>
 #include <thread>
 
+#include "connection/rpc-client.h"
 #include "core/client.h"
 #include "core/get.h"
 #include "core/put.h"
@@ -75,14 +76,16 @@ int main(int argc, char *argv[]) {
   conf->SetInt("hbase.client.cpu.thread.pool.size", FLAGS_threads);
 
   auto row = FLAGS_row;
+
   auto tn = std::make_shared<TableName>(folly::to<TableName>(FLAGS_table));
   auto num_puts = FLAGS_num_rows;
 
   auto client = std::make_unique<Client>(*conf);
   auto table = client->Table(*tn);
 
-  // Do the Put requests
   auto start_ns = TimeUtil::GetNowNanos();
+
+  // Do the Put requests
   for (uint64_t i = 0; i < num_puts; i++) {
     table->Put(*MakePut(Row(FLAGS_row, i)));
   }
@@ -102,6 +105,23 @@ int main(int argc, char *argv[]) {
   LOG(INFO) << "Successfully sent  " << num_puts << " Get requests in "
             << TimeUtil::ElapsedMillis(start_ns) << " ms.";
 
+  // Do the Multi-Gets
+  std::vector<hbase::Get> gets;
+  for (uint64_t i = 0; i < num_puts; ++i) {
+    hbase::Get get(Row(FLAGS_row, i));
+    gets.push_back(get);
+  }
+
+  start_ns = TimeUtil::GetNowNanos();
+  auto results = table->Get(gets);
+
+  if (FLAGS_display_results) {
+    for (const auto &result : results) LOG(INFO) << result->DebugString();
+  }
+
+  LOG(INFO) << "Successfully sent  " << gets.size() << " Multi-Get requests in "
+            << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+
   table->Close();
   client->Close();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index 8ace4af..a2f31d9 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -19,7 +19,6 @@
 
 #include "core/table.h"
 
-#include <folly/futures/Future.h>
 #include <chrono>
 #include <limits>
 #include <utility>
@@ -33,7 +32,6 @@
 #include "serde/server-name.h"
 #include "utils/time-util.h"
 
-using folly::Future;
 using hbase::pb::TableName;
 using hbase::security::User;
 using std::chrono::milliseconds;
@@ -69,4 +67,20 @@ std::shared_ptr<RegionLocation> Table::GetRegionLocation(const std::string &row)
   return async_connection_->region_locator()->LocateRegion(*table_name_, row).get();
 }
 
+std::vector<std::shared_ptr<hbase::Result>> Table::Get(const std::vector<hbase::Get> &gets) {
+  auto tresults = async_table_->Get(gets).get(operation_timeout());
+  std::vector<std::shared_ptr<hbase::Result>> results{};
+  uint32_t num = 0;
+  for (auto tresult : tresults) {
+    if (tresult.hasValue()) {
+      results.push_back(tresult.value());
+    } else if (tresult.hasException()) {
+      LOG(ERROR) << "Caught exception:- " << tresult.exception().getCopied()->what() << " for "
+                 << gets[num++].row();
+      throw tresult.exception().getCopied();
+    }
+  }
+  return results;
+}
+
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccfc6825/hbase-native-client/core/table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index cbb95b7..142baae 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -54,8 +54,7 @@ class Table {
    */
   std::shared_ptr<hbase::Result> Get(const hbase::Get &get);
 
-  // TODO: next jira
-  // std::vector<std::unique_ptr<hbase::Result>> Get(const std::vector<hbase::Get> &gets);
+  std::vector<std::shared_ptr<hbase::Result>> Get(const std::vector<hbase::Get> &gets);
 
   /**
    * @brief - Puts some data in the table.


Mime
View raw message