hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zg...@apache.org
Subject [hbase] 122/133: HBASE-18507 [C++] Support for MultiPuts in AsyncBatchRpcRetryingCaller class (Sudeep Sunthankar)
Date Tue, 12 Mar 2019 12:46:50 GMT
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 18eba56d1b37233ea74c4f1a023d2c1410c0db01
Author: Enis Soztutar <enis@apache.org>
AuthorDate: Fri Sep 1 10:55:57 2017 -0700

    HBASE-18507 [C++] Support for MultiPuts in AsyncBatchRpcRetryingCaller class (Sudeep Sunthankar)
---
 hbase-native-client/core/action.h                  |   9 +-
 .../core/async-batch-rpc-retrying-caller.cc        | 128 ++++++----
 .../core/async-batch-rpc-retrying-caller.h         |  12 +-
 .../core/async-batch-rpc-retrying-test.cc          | 275 +++++++++++++++------
 .../core/async-rpc-retrying-caller-factory.h       |  20 +-
 hbase-native-client/core/client-test.cc            | 148 +++++++++++
 hbase-native-client/core/raw-async-table.cc        |  31 ++-
 hbase-native-client/core/raw-async-table.h         |   7 +-
 hbase-native-client/core/request-converter.cc      |  18 +-
 hbase-native-client/core/table.cc                  |  13 +
 hbase-native-client/core/table.h                   |   6 +-
 11 files changed, 506 insertions(+), 161 deletions(-)

diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h
index 21a0181..a00f079 100644
--- a/hbase-native-client/core/action.h
+++ b/hbase-native-client/core/action.h
@@ -20,22 +20,21 @@
 #pragma once
 
 #include <memory>
-#include "core/get.h"
+#include "core/row.h"
 
 namespace hbase {
-
 class Action {
  public:
-  Action(std::shared_ptr<hbase::Get> action, int32_t original_index)
+  Action(std::shared_ptr<hbase::Row> action, int32_t original_index)
       : action_(action), original_index_(original_index) {}
   ~Action() {}
 
   int32_t original_index() const { return original_index_; }
 
-  std::shared_ptr<hbase::Get> action() const { return action_; }
+  std::shared_ptr<hbase::Row> action() const { return action_; }
 
  private:
-  std::shared_ptr<hbase::Get> action_;
+  std::shared_ptr<hbase::Row> action_;
   int32_t original_index_;
   int64_t nonce_ = -1;
   int32_t replica_id_ = -1;
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
index 0d67b17..dfbf7e7 100644
--- a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
@@ -32,11 +32,12 @@ using std::chrono::milliseconds;
 
 namespace hbase {
 
-AsyncBatchRpcRetryingCaller::AsyncBatchRpcRetryingCaller(
+template <typename REQ, typename RESP>
+AsyncBatchRpcRetryingCaller<REQ, RESP>::AsyncBatchRpcRetryingCaller(
     std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
-    std::shared_ptr<TableName> table_name, const std::vector<hbase::Get> &actions,
-    nanoseconds pause_ns, int32_t max_attempts, nanoseconds operation_timeout_ns,
-    nanoseconds rpc_timeout_ns, int32_t start_log_errors_count)
+    std::shared_ptr<TableName> table_name, const std::vector<REQ> &actions, nanoseconds pause_ns,
+    int32_t max_attempts, nanoseconds operation_timeout_ns, nanoseconds rpc_timeout_ns,
+    int32_t start_log_errors_count)
     : conn_(conn),
       retry_timer_(retry_timer),
       table_name_(table_name),
@@ -56,29 +57,31 @@ AsyncBatchRpcRetryingCaller::AsyncBatchRpcRetryingCaller(
   max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts);
   uint32_t index = 0;
   for (auto row : actions) {
-    actions_.push_back(std::make_shared<Action>(std::make_shared<hbase::Get>(row), index));
-    Promise<std::shared_ptr<Result>> prom{};
-    action2promises_.insert(
-        std::pair<uint64_t, Promise<std::shared_ptr<Result>>>(index, std::move(prom)));
+    actions_.push_back(std::make_shared<Action>(row, index));
+    Promise<RESP> prom{};
+    action2promises_.insert(std::pair<uint64_t, Promise<RESP>>(index, std::move(prom)));
     action2futures_.push_back(action2promises_[index++].getFuture());
   }
 }
 
-AsyncBatchRpcRetryingCaller::~AsyncBatchRpcRetryingCaller() {}
+template <typename REQ, typename RESP>
+AsyncBatchRpcRetryingCaller<REQ, RESP>::~AsyncBatchRpcRetryingCaller() {}
 
-Future<std::vector<Try<std::shared_ptr<Result>>>> AsyncBatchRpcRetryingCaller::Call() {
+template <typename REQ, typename RESP>
+Future<std::vector<Try<RESP>>> AsyncBatchRpcRetryingCaller<REQ, RESP>::Call() {
   GroupAndSend(actions_, 1);
   return collectAll(action2futures_);
 }
 
-int64_t AsyncBatchRpcRetryingCaller::RemainingTimeNs() {
+template <typename REQ, typename RESP>
+int64_t AsyncBatchRpcRetryingCaller<REQ, RESP>::RemainingTimeNs() {
   return operation_timeout_ns_.count() - (TimeUtil::GetNowNanos() - start_ns_);
 }
 
-void AsyncBatchRpcRetryingCaller::LogException(int32_t tries,
-                                               std::shared_ptr<RegionRequest> region_request,
-                                               const folly::exception_wrapper &ew,
-                                               std::shared_ptr<ServerName> server_name) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException(
+    int32_t tries, std::shared_ptr<RegionRequest> region_request,
+    const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
   if (tries > start_log_errors_count_) {
     std::string regions;
     regions += region_request->region_location()->region_name() + ", ";
@@ -88,7 +91,8 @@ void AsyncBatchRpcRetryingCaller::LogException(int32_t tries,
   }
 }
 
-void AsyncBatchRpcRetryingCaller::LogException(
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException(
     int32_t tries, const std::vector<std::shared_ptr<RegionRequest>> &region_requests,
     const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
   if (tries > start_log_errors_count_) {
@@ -102,29 +106,35 @@ void AsyncBatchRpcRetryingCaller::LogException(
   }
 }
 
-const std::string AsyncBatchRpcRetryingCaller::GetExtraContextForError(
+template <typename REQ, typename RESP>
+const std::string AsyncBatchRpcRetryingCaller<REQ, RESP>::GetExtraContextForError(
     std::shared_ptr<ServerName> server_name) {
   return server_name ? server_name->ShortDebugString() : "";
 }
 
-void AsyncBatchRpcRetryingCaller::AddError(const std::shared_ptr<Action> &action,
-                                           const folly::exception_wrapper &ew,
-                                           std::shared_ptr<ServerName> server_name) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError(const std::shared_ptr<Action> &action,
+                                                      const folly::exception_wrapper &ew,
+                                                      std::shared_ptr<ServerName> server_name) {
   ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
   AddAction2Error(action->original_index(), twec);
 }
 
-void AsyncBatchRpcRetryingCaller::AddError(const std::vector<std::shared_ptr<Action>> &actions,
-                                           const folly::exception_wrapper &ew,
-                                           std::shared_ptr<ServerName> server_name) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError(
+    const std::vector<std::shared_ptr<Action>> &actions, const folly::exception_wrapper &ew,
+    std::shared_ptr<ServerName> server_name) {
   for (const auto action : actions) {
     AddError(action, ew, server_name);
   }
 }
 
-void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action, int32_t tries,
-                                          const folly::exception_wrapper &ew, int64_t current_time,
-                                          const std::string extras) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailOne(const std::shared_ptr<Action> &action,
+                                                     int32_t tries,
+                                                     const folly::exception_wrapper &ew,
+                                                     int64_t current_time,
+                                                     const std::string extras) {
   auto action_index = action->original_index();
   auto itr = action2promises_.find(action_index);
   if (itr != action2promises_.end()) {
@@ -138,16 +148,18 @@ void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action,
       RetriesExhaustedException(tries - 1, action2errors_[action_index]));
 }
 
-void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions,
-                                          int32_t tries, const folly::exception_wrapper &ew,
-                                          std::shared_ptr<ServerName> server_name) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll(
+    const std::vector<std::shared_ptr<Action>> &actions, int32_t tries,
+    const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
   for (const auto action : actions) {
     FailOne(action, tries, ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
   }
 }
 
-void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions,
-                                          int32_t tries) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll(
+    const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
   for (const auto action : actions) {
     auto action_index = action->original_index();
     auto itr = action2promises_.find(action_index);
@@ -159,8 +171,9 @@ void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Acti
   }
 }
 
-void AsyncBatchRpcRetryingCaller::AddAction2Error(uint64_t action_index,
-                                                  const ThrowableWithExtraContext &twec) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddAction2Error(
+    uint64_t action_index, const ThrowableWithExtraContext &twec) {
   auto erritr = action2errors_.find(action_index);
   if (erritr != action2errors_.end()) {
     erritr->second->push_back(twec);
@@ -171,9 +184,11 @@ void AsyncBatchRpcRetryingCaller::AddAction2Error(uint64_t action_index,
   return;
 }
 
-void AsyncBatchRpcRetryingCaller::OnError(const ActionsByRegion &actions_by_region, int32_t tries,
-                                          const folly::exception_wrapper &ew,
-                                          std::shared_ptr<ServerName> server_name) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnError(const ActionsByRegion &actions_by_region,
+                                                     int32_t tries,
+                                                     const folly::exception_wrapper &ew,
+                                                     std::shared_ptr<ServerName> server_name) {
   std::vector<std::shared_ptr<Action>> copied_actions;
   std::vector<std::shared_ptr<RegionRequest>> region_requests;
   for (const auto &action_by_region : actions_by_region) {
@@ -192,8 +207,9 @@ void AsyncBatchRpcRetryingCaller::OnError(const ActionsByRegion &actions_by_regi
   TryResubmit(copied_actions, tries);
 }
 
-void AsyncBatchRpcRetryingCaller::TryResubmit(const std::vector<std::shared_ptr<Action>> &actions,
-                                              int32_t tries) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::TryResubmit(
+    const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
   int64_t delay_ns;
   if (operation_timeout_ns_.count() > 0) {
     int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
@@ -213,9 +229,10 @@ void AsyncBatchRpcRetryingCaller::TryResubmit(const std::vector<std::shared_ptr<
   });
 }
 
+template <typename REQ, typename RESP>
 Future<std::vector<Try<std::shared_ptr<RegionLocation>>>>
-AsyncBatchRpcRetryingCaller::GetRegionLocations(const std::vector<std::shared_ptr<Action>> &actions,
-                                                int64_t locate_timeout_ns) {
+AsyncBatchRpcRetryingCaller<REQ, RESP>::GetRegionLocations(
+    const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns) {
   auto locs = std::vector<Future<std::shared_ptr<RegionLocation>>>{};
   for (auto const &action : actions) {
     locs.push_back(location_cache_->LocateRegion(*table_name_, action->action()->row(),
@@ -225,8 +242,9 @@ AsyncBatchRpcRetryingCaller::GetRegionLocations(const std::vector<std::shared_pt
   return collectAll(locs);
 }
 
-void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr<Action>> &actions,
-                                               int32_t tries) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::GroupAndSend(
+    const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
   int64_t locate_timeout_ns;
   if (operation_timeout_ns_.count() > 0) {
     locate_timeout_ns = RemainingTimeNs();
@@ -300,8 +318,9 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr
   return;
 }
 
-Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller::GetMultiResponse(
-    const ActionsByServer &actions_by_server) {
+template <typename REQ, typename RESP>
+Future<std::vector<Try<std::unique_ptr<Response>>>>
+AsyncBatchRpcRetryingCaller<REQ, RESP>::GetMultiResponse(const ActionsByServer &actions_by_server) {
   auto multi_calls = std::vector<Future<std::unique_ptr<hbase::Response>>>{};
   auto user = User::defaultUser();
   for (const auto &action_by_server : actions_by_server) {
@@ -315,7 +334,9 @@ Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller:
   return collectAll(multi_calls);
 }
 
-void AsyncBatchRpcRetryingCaller::Send(const ActionsByServer &actions_by_server, int32_t tries) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::Send(const ActionsByServer &actions_by_server,
+                                                  int32_t tries) {
   int64_t remaining_ns;
   if (operation_timeout_ns_.count() > 0) {
     remaining_ns = RemainingTimeNs();
@@ -371,7 +392,8 @@ void AsyncBatchRpcRetryingCaller::Send(const ActionsByServer &actions_by_server,
   return;
 }
 
-void AsyncBatchRpcRetryingCaller::OnComplete(
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete(
     const ActionsByRegion &actions_by_region, int32_t tries,
     const std::shared_ptr<ServerName> server_name,
     const std::unique_ptr<hbase::MultiResponse> multi_response) {
@@ -418,12 +440,12 @@ void AsyncBatchRpcRetryingCaller::OnComplete(
   return;
 }
 
-void AsyncBatchRpcRetryingCaller::OnComplete(const std::shared_ptr<Action> &action,
-                                             const std::shared_ptr<RegionRequest> &region_request,
-                                             int32_t tries,
-                                             const std::shared_ptr<ServerName> &server_name,
-                                             const std::shared_ptr<RegionResult> &region_result,
-                                             std::vector<std::shared_ptr<Action>> &failed_actions) {
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete(
+    const std::shared_ptr<Action> &action, const std::shared_ptr<RegionRequest> &region_request,
+    int32_t tries, const std::shared_ptr<ServerName> &server_name,
+    const std::shared_ptr<RegionResult> &region_result,
+    std::vector<std::shared_ptr<Action>> &failed_actions) {
   std::string err_msg;
   try {
     auto result_or_exc = region_result->ResultOrException(action->original_index());
@@ -461,4 +483,6 @@ void AsyncBatchRpcRetryingCaller::OnComplete(const std::shared_ptr<Action> &acti
   return;
 }
 
+template class AsyncBatchRpcRetryingCaller<std::shared_ptr<hbase::Row>,
+                                           std::shared_ptr<hbase::Result>>;
 } /* namespace hbase */
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
index 194c439..9194b04 100644
--- a/hbase-native-client/core/async-batch-rpc-retrying-caller.h
+++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
@@ -84,6 +84,7 @@ struct ServerNameHash {
   }
 };
 
+template <typename REQ, typename RESP>
 class AsyncBatchRpcRetryingCaller {
  public:
   using ActionsByServer =
@@ -94,15 +95,14 @@ class AsyncBatchRpcRetryingCaller {
   AsyncBatchRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
                               std::shared_ptr<folly::HHWheelTimer> retry_timer,
                               std::shared_ptr<pb::TableName> table_name,
-                              const std::vector<hbase::Get> &actions,
-                              std::chrono::nanoseconds pause_ns, int32_t max_attempts,
-                              std::chrono::nanoseconds operation_timeout_ns,
+                              const std::vector<REQ> &actions, std::chrono::nanoseconds pause_ns,
+                              int32_t max_attempts, std::chrono::nanoseconds operation_timeout_ns,
                               std::chrono::nanoseconds rpc_timeout_ns,
                               int32_t start_log_errors_count);
 
   ~AsyncBatchRpcRetryingCaller();
 
-  folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Call();
+  folly::Future<std::vector<folly::Try<RESP>>> Call();
 
  private:
   int64_t RemainingTimeNs();
@@ -172,8 +172,8 @@ class AsyncBatchRpcRetryingCaller {
 
   int64_t start_ns_ = TimeUtil::GetNowNanos();
   int32_t tries_ = 1;
-  std::map<uint64_t, folly::Promise<std::shared_ptr<Result>>> action2promises_;
-  std::vector<folly::Future<std::shared_ptr<Result>>> action2futures_;
+  std::map<uint64_t, folly::Promise<RESP>> action2promises_;
+  std::vector<folly::Future<RESP>> action2futures_;
   std::map<uint64_t, std::shared_ptr<std::vector<ThrowableWithExtraContext>>> action2errors_;
 
   std::shared_ptr<AsyncRegionLocator> location_cache_ = nullptr;
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-test.cc b/hbase-native-client/core/async-batch-rpc-retrying-test.cc
index cad03e1..b8a0b81 100644
--- a/hbase-native-client/core/async-batch-rpc-retrying-test.cc
+++ b/hbase-native-client/core/async-batch-rpc-retrying-test.cc
@@ -230,7 +230,12 @@ class MockAsyncConnection : public AsyncConnection,
     return retry_executor_;
   }
 
-  void Close() override {}
+  void Close() override {
+    retry_timer_->destroy();
+    retry_executor_->stop();
+    io_executor_->stop();
+    cpu_executor_->stop();
+  }
   std::shared_ptr<HBaseRpcController> CreateRpcController() override {
     return std::make_shared<HBaseRpcController>();
   }
@@ -254,15 +259,15 @@ class MockRawAsyncTableImpl {
   virtual ~MockRawAsyncTableImpl() = default;
 
   /* implement this in real RawAsyncTableImpl. */
-  folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Gets(
-      const std::vector<hbase::Get> &gets) {
+  template <typename REQ, typename RESP>
+  folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ> &rows) {
     /* init request caller builder */
-    auto builder = conn_->caller_factory()->Batch();
+    auto builder = conn_->caller_factory()->Batch<REQ, RESP>();
 
     /* call with retry to get result */
     auto async_caller =
         builder->table(tn_)
-            ->actions(std::make_shared<std::vector<hbase::Get>>(gets))
+            ->actions(std::make_shared<std::vector<REQ>>(rows))
             ->rpc_timeout(conn_->connection_conf()->read_rpc_timeout())
             ->operation_timeout(conn_->connection_conf()->operation_timeout())
             ->pause(conn_->connection_conf()->pause())
@@ -278,9 +283,7 @@ class MockRawAsyncTableImpl {
   std::shared_ptr<hbase::pb::TableName> tn_;
 };
 
-void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
-                  const std::string &table_name, bool split_regions, uint32_t tries = 3,
-                  uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) {
+std::string createTestTable(bool split_regions, const std::string &table_name) {
   std::vector<std::string> keys{"test0",   "test100", "test200", "test300", "test400",
                                 "test500", "test600", "test700", "test800", "test900"};
   std::string tableName = (split_regions) ? ("split-" + table_name) : table_name;
@@ -289,30 +292,12 @@ void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
   } else {
     AsyncBatchRpcRetryTest::test_util->CreateTable(tableName, "d");
   }
+  return tableName;
+}
 
-  // Create TableName and Row to be fetched from HBase
-  auto tn = folly::to<hbase::pb::TableName>(tableName);
-
-  // Create a client
-  Client client(*AsyncBatchRpcRetryTest::test_util->conf());
-
-  // Get connection to HBase Table
-  auto table = client.Table(tn);
-
-  for (uint64_t i = 0; i < num_rows; i++) {
-    table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
-                                                         "value" + std::to_string(i)));
-  }
-
-  std::map<std::string, std::shared_ptr<RegionLocation>> region_locations;
-  std::vector<hbase::Get> gets;
-  for (uint64_t i = 0; i < num_rows; ++i) {
-    auto row = "test" + std::to_string(i);
-    hbase::Get get(row);
-    gets.push_back(get);
-    region_locations[row] = table->GetRegionLocation(row);
-  }
-
+std::shared_ptr<MockAsyncConnection> getAsyncConnection(
+    Client &client, uint32_t operation_timeout_millis, uint32_t tries,
+    std::shared_ptr<AsyncRegionLocatorBase> region_locator) {
   /* init region location and rpc channel */
   auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
   auto io_executor_ = client.async_connection()->io_executor();
@@ -332,35 +317,90 @@ void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
       tries,                                              // max retries
       1);                                                 // start log errors count
 
-  /* set region locator */
-  region_locator->set_region_location(region_locations);
-
-  /* init hbase client connection */
-  auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
-                                                    io_executor_, retry_executor_, rpc_client,
-                                                    region_locator);
-  conn->Init();
-
-  /* init retry caller factory */
-  auto tableImpl =
-      std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
+  return std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
+                                               io_executor_, retry_executor_, rpc_client,
+                                               region_locator);
+}
 
-  auto tresults = tableImpl->Gets(gets).get(milliseconds(operation_timeout_millis));
+template <typename ACTION>
+std::vector<std::shared_ptr<hbase::Row>> getRows(std::vector<ACTION> actions) {
+  std::vector<std::shared_ptr<hbase::Row>> rows;
+  for (auto action : actions) {
+    std::shared_ptr<hbase::Row> srow = std::make_shared<ACTION>(action);
+    rows.push_back(srow);
+  }
+  return rows;
+}
 
-  ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
+template <typename REQ, typename RESP>
+std::vector<std::shared_ptr<hbase::Result>> getResults(std::vector<REQ> &actions,
+                                                       std::vector<folly::Try<RESP>> &tresults) {
   std::vector<std::shared_ptr<hbase::Result>> results{};
-  uint32_t num = 0;
+  uint64_t num = 0;
   for (auto tresult : tresults) {
     if (tresult.hasValue()) {
       results.push_back(tresult.value());
     } else if (tresult.hasException()) {
       folly::exception_wrapper ew = tresult.exception();
-      LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for " << gets[num].row();
+      LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for "
+                 << actions[num].row();
       throw ew;
     }
     ++num;
   }
+  return results;
+}
+
+template <typename ACTION>
+std::map<std::string, std::shared_ptr<RegionLocation>> getRegionLocationsAndActions(
+    uint64_t num_rows, std::vector<ACTION> &actions, std::shared_ptr<Table> table) {
+  std::map<std::string, std::shared_ptr<RegionLocation>> region_locations;
+  for (uint64_t i = 0; i < num_rows; ++i) {
+    auto row = "test" + std::to_string(i);
+    ACTION action(row);
+    actions.push_back(action);
+    region_locations[row] = table->GetRegionLocation(row);
+  }
+  return region_locations;
+}
+
+void runMultiGets(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
+                  const std::string &table_name, bool split_regions, uint32_t tries = 3,
+                  uint32_t operation_timeout_millis = 600000, uint64_t num_rows = 1000) {
+  auto tableName = createTestTable(split_regions, table_name);
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>(tableName);
+
+  // Create a client
+  Client client(*AsyncBatchRpcRetryTest::test_util->conf());
+
+  // Get connection to HBase Table
+  std::shared_ptr<Table> table = client.Table(tn);
+
+  for (uint64_t i = 0; i < num_rows; i++) {
+    table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
+                                                         "value" + std::to_string(i)));
+  }
+  std::vector<hbase::Get> gets;
+  auto region_locations = getRegionLocationsAndActions<hbase::Get>(num_rows, gets, table);
+
+  /* set region locator */
+  region_locator->set_region_location(region_locations);
 
+  /* init hbase client connection */
+  auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
+  conn->Init();
+
+  /* init retry caller factory */
+  auto tableImpl =
+      std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
+
+  std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Get>(gets);
+  auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
+      milliseconds(operation_timeout_millis));
+  ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
+
+  auto results = getResults<hbase::Get, std::shared_ptr<Result>>(gets, tresults);
   // Test the values, should be same as in put executed on hbase shell
   ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
   uint32_t i = 0;
@@ -371,101 +411,184 @@ void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
     EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value());
   }
 
-  retry_timer->destroy();
   table->Close();
   client.Close();
-  retry_executor_->stop();
-  io_executor_->stop();
-  cpu_executor_->stop();
+  conn->Close();
+}
+
+void runMultiPuts(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
+                  const std::string &table_name, bool split_regions, uint32_t tries = 3,
+                  uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) {
+  auto tableName = createTestTable(split_regions, table_name);
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>(tableName);
+
+  // Create a client
+  Client client(*AsyncBatchRpcRetryTest::test_util->conf());
+
+  // Get connection to HBase Table
+  std::shared_ptr<Table> table = client.Table(tn);
+
+  std::vector<hbase::Put> puts;
+  auto region_locations = getRegionLocationsAndActions<hbase::Put>(num_rows, puts, table);
+
+  /* set region locator */
+  region_locator->set_region_location(region_locations);
+
+  /* init hbase client connection */
+  auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
+  conn->Init();
+
+  /* init retry caller factory */
+  auto tableImpl =
+      std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
+
+  std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Put>(puts);
+  auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
+      milliseconds(operation_timeout_millis));
+  ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
+
+  auto results = getResults<hbase::Put, std::shared_ptr<Result>>(puts, tresults);
+  // Test the values, should be same as in put executed on hbase shell
+  ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
+
+  table->Close();
+  client.Close();
+  conn->Close();
 }
 
 // Test successful case
 TEST_F(AsyncBatchRpcRetryTest, MultiGets) {
   std::shared_ptr<AsyncRegionLocatorBase> region_locator(
       std::make_shared<MockAsyncRegionLocator>());
-  runMultiTest(region_locator, "table1", false);
+  runMultiGets(region_locator, "table1", false);
 }
 
 // Tests the RPC failing 3 times, then succeeding
 TEST_F(AsyncBatchRpcRetryTest, HandleException) {
   std::shared_ptr<AsyncRegionLocatorBase> region_locator(
       std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
-  runMultiTest(region_locator, "table2", false, 5);
+  runMultiGets(region_locator, "table2", false, 5);
 }
 
 // Tests the RPC failing 4 times, throwing an exception
 TEST_F(AsyncBatchRpcRetryTest, FailWithException) {
   std::shared_ptr<AsyncRegionLocatorBase> region_locator(
       std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
-  EXPECT_ANY_THROW(runMultiTest(region_locator, "table3", false));
+  EXPECT_ANY_THROW(runMultiGets(region_locator, "table3", false));
 }
 
 // Tests the region location lookup failing 3 times, then succeeding
 TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) {
   std::shared_ptr<AsyncRegionLocatorBase> region_locator(
       std::make_shared<MockFailingAsyncRegionLocator>(3));
-  runMultiTest(region_locator, "table4", false);
+  runMultiGets(region_locator, "table4", false);
 }
 
 // Tests the region location lookup failing 5 times, throwing an exception
 TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) {
   std::shared_ptr<AsyncRegionLocatorBase> region_locator(
       std::make_shared<MockFailingAsyncRegionLocator>(4));
-  EXPECT_ANY_THROW(runMultiTest(region_locator, "table5", false, 3));
+  EXPECT_ANY_THROW(runMultiGets(region_locator, "table5", false, 3));
 }
 
 // Tests hitting operation timeout, thus not retrying anymore
 TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) {
   std::shared_ptr<AsyncRegionLocatorBase> region_locator(
       std::make_shared<MockFailingAsyncRegionLocator>(6));
-  EXPECT_ANY_THROW(runMultiTest(region_locator, "table6", false, 5, 100, 1000));
+  EXPECT_ANY_THROW(runMultiGets(region_locator, "table6", false, 5, 100, 1000));
 }
 
-/*
-  TODO: Below tests are failing with frequently with segfaults coming from
-  JNI internals indicating that we are doing something wrong in the JNI boundary.
-  However, we were not able to debug furhter yet. Disable the tests for now, and
-  come back later to fix the issue.
-
+//////////////////////
 // Test successful case
-TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) {
+TEST_F(AsyncBatchRpcRetryTest, MultiPuts) {
   std::shared_ptr<AsyncRegionLocatorBase> region_locator(
       std::make_shared<MockAsyncRegionLocator>());
-  runMultiTest(region_locator, "table7", true);
+  runMultiPuts(region_locator, "table1", false);
 }
 
 // Tests the RPC failing 3 times, then succeeding
-TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) {
+TEST_F(AsyncBatchRpcRetryTest, PutsHandleException) {
   std::shared_ptr<AsyncRegionLocatorBase> region_locator(
       std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
-  runMultiTest(region_locator, "table8", true, 5);
+  runMultiPuts(region_locator, "table2", false, 5);
 }
 
 // Tests the RPC failing 4 times, throwing an exception
-TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) {
+TEST_F(AsyncBatchRpcRetryTest, PutsFailWithException) {
   std::shared_ptr<AsyncRegionLocatorBase> region_locator(
       std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
-  EXPECT_ANY_THROW(runMultiTest(region_locator, "table9", true));
+  EXPECT_ANY_THROW(runMultiPuts(region_locator, "table3", false));
 }
 
 // Tests the region location lookup failing 3 times, then succeeding
-TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) {
+TEST_F(AsyncBatchRpcRetryTest, PutsHandleExceptionFromRegionLocationLookup) {
   std::shared_ptr<AsyncRegionLocatorBase> region_locator(
       std::make_shared<MockFailingAsyncRegionLocator>(3));
-  runMultiTest(region_locator, "table10", true);
+  runMultiPuts(region_locator, "table4", false);
 }
 
 // Tests the region location lookup failing 5 times, throwing an exception
-TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) {
+TEST_F(AsyncBatchRpcRetryTest, PutsFailWithExceptionFromRegionLocationLookup) {
   std::shared_ptr<AsyncRegionLocatorBase> region_locator(
       std::make_shared<MockFailingAsyncRegionLocator>(4));
-  EXPECT_ANY_THROW(runMultiTest(region_locator, "table11", true, 3));
+  EXPECT_ANY_THROW(runMultiPuts(region_locator, "table5", false, 3));
 }
 
 // Tests hitting operation timeout, thus not retrying anymore
-TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) {
+TEST_F(AsyncBatchRpcRetryTest, PutsFailWithOperationTimeout) {
   std::shared_ptr<AsyncRegionLocatorBase> region_locator(
       std::make_shared<MockFailingAsyncRegionLocator>(6));
-  EXPECT_ANY_THROW(runMultiTest(region_locator, "table12", true, 5, 100, 1000));
+  EXPECT_ANY_THROW(runMultiPuts(region_locator, "table6", false, 5, 100, 1000));
 }
-*/
+
+//////////////////////
+/*
+ TODO: Below tests are failing with frequently with segfaults coming from
+ JNI internals indicating that we are doing something wrong in the JNI boundary.
+ However, we were not able to debug furhter yet. Disable the tests for now, and
+ come back later to fix the issue.
+
+ // Test successful case
+ TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockAsyncRegionLocator>());
+ runMultiGets(region_locator, "table7", true);
+ }
+
+ // Tests the RPC failing 3 times, then succeeding
+ TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+ runMultiGets(region_locator, "table8", true, 5);
+ }
+
+ // Tests the RPC failing 4 times, throwing an exception
+ TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table9", true));
+ }
+
+ // Tests the region location lookup failing 3 times, then succeeding
+ TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(3));
+ runMultiGets(region_locator, "table10", true);
+ }
+
+ // Tests the region location lookup failing 5 times, throwing an exception
+ TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(4));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table11", true, 3));
+ }
+
+ // Tests hitting operation timeout, thus not retrying anymore
+ TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) {
+ std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+ std::make_shared<MockFailingAsyncRegionLocator>(6));
+ EXPECT_ANY_THROW(runMultiGets(region_locator, "table12", true, 5, 100, 1000));
+ }
+ */
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
index 1af6e72..188f469 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
@@ -135,7 +135,8 @@ class SingleRequestCallerBuilder
   Callable<RESP> callable_;
 };  // end of SingleRequestCallerBuilder
 
-class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilder> {
+template <typename REQ, typename RESP>
+class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilder<REQ, RESP>> {
  public:
   explicit BatchCallerBuilder(std::shared_ptr<AsyncConnection> conn,
                               std::shared_ptr<folly::HHWheelTimer> retry_timer)
@@ -143,14 +144,14 @@ class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilde
 
   virtual ~BatchCallerBuilder() = default;
 
-  typedef std::shared_ptr<BatchCallerBuilder> SharedThisPtr;
+  typedef std::shared_ptr<BatchCallerBuilder<REQ, RESP>> SharedThisPtr;
 
   SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) {
     table_name_ = table_name;
     return shared_this();
   }
 
-  SharedThisPtr actions(std::shared_ptr<std::vector<hbase::Get>> actions) {
+  SharedThisPtr actions(std::shared_ptr<std::vector<REQ>> actions) {
     actions_ = actions;
     return shared_this();
   }
@@ -180,10 +181,10 @@ class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilde
     return shared_this();
   }
 
-  folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Call() { return Build()->Call(); }
+  folly::Future<std::vector<folly::Try<RESP>>> Call() { return Build()->Call(); }
 
-  std::shared_ptr<AsyncBatchRpcRetryingCaller> Build() {
-    return std::make_shared<AsyncBatchRpcRetryingCaller>(
+  std::shared_ptr<AsyncBatchRpcRetryingCaller<REQ, RESP>> Build() {
+    return std::make_shared<AsyncBatchRpcRetryingCaller<REQ, RESP>>(
         conn_, retry_timer_, table_name_, *actions_, pause_ns_, max_attempts_,
         operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_);
   }
@@ -197,7 +198,7 @@ class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilde
   std::shared_ptr<AsyncConnection> conn_;
   std::shared_ptr<folly::HHWheelTimer> retry_timer_;
   std::shared_ptr<hbase::pb::TableName> table_name_ = nullptr;
-  std::shared_ptr<std::vector<hbase::Get>> actions_ = nullptr;
+  std::shared_ptr<std::vector<REQ>> actions_ = nullptr;
   std::chrono::nanoseconds pause_ns_;
   int32_t max_attempts_ = 0;
   std::chrono::nanoseconds operation_timeout_nanos_;
@@ -329,8 +330,9 @@ class AsyncRpcRetryingCallerFactory {
     return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_, retry_timer_);
   }
 
-  std::shared_ptr<BatchCallerBuilder> Batch() {
-    return std::make_shared<BatchCallerBuilder>(conn_, retry_timer_);
+  template <typename REQ, typename RESP>
+  std::shared_ptr<BatchCallerBuilder<REQ, RESP>> Batch() {
+    return std::make_shared<BatchCallerBuilder<REQ, RESP>>(conn_, retry_timer_);
   }
 
   std::shared_ptr<ScanCallerBuilder> Scan() {
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index 1c9b709..3f72880 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -547,3 +547,151 @@ TEST_F(ClientTest, MultiGetsWithRegionSplits) {
   table->Close();
   client.Close();
 }
+
+void PerformMultiPuts(uint64_t num_rows, std::shared_ptr<hbase::Client> client,
+                      const std::string &table_name) {
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+  auto table = client->Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+  std::vector<hbase::Put> puts;
+  // Perform Puts
+  for (uint64_t i = 0; i < num_rows; i++) {
+    puts.push_back(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
+                                                             "value" + std::to_string(i)));
+  }
+  table->Put(puts);
+}
+
+void PerformMultiPuts(std::vector<hbase::Put> &puts, std::shared_ptr<Table> table) {
+  table->Put(puts);
+}
+
+TEST_F(ClientTest, MultiGetsWithMultiPuts) {
+  std::string table_name = "t";
+  // Using TestUtil to populate test data
+  ClientTest::test_util->CreateTable(table_name, "d");
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+
+  SetClientParams();
+  // Create a client
+  hbase::Client client(*ClientTest::test_util->conf());
+
+  uint64_t num_rows = 50000;
+  PerformMultiPuts(num_rows, std::make_shared<hbase::Client>(client), table_name);
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  std::vector<hbase::Get> gets;
+  MakeGets(num_rows, "test", gets);
+
+  auto results = table->Get(gets);
+
+  TestMultiResults(num_rows, results, gets);
+
+  table->Close();
+  client.Close();
+}
+
+TEST_F(ClientTest, MultiGetsWithMultiPutsAndSplitRegions) {
+  // Using TestUtil to populate test data
+  std::vector<std::string> keys{"test0",   "test100", "test200", "test300", "test400",
+                                "test500", "test600", "test700", "test800", "test900"};
+  std::string table_name = "t";
+  ClientTest::test_util->CreateTable(table_name, "d", keys);
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+
+  SetClientParams();
+
+  // Create a client
+  hbase::Client client(*ClientTest::test_util->conf());
+
+  uint64_t num_rows = 50000;
+  PerformMultiPuts(num_rows, std::make_shared<hbase::Client>(client), table_name);
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  std::vector<hbase::Get> gets;
+  MakeGets(num_rows, "test", gets);
+
+  auto results = table->Get(gets);
+
+  TestMultiResults(num_rows, results, gets);
+
+  table->Close();
+  client.Close();
+}
+
+TEST_F(ClientTest, MultiPuts) {
+  std::string table_name = "t";
+  // Using TestUtil to populate test data
+  ClientTest::test_util->CreateTable(table_name, "d");
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+
+  SetClientParams();
+  // Create a client
+  hbase::Client client(*ClientTest::test_util->conf());
+  std::shared_ptr<Table> table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  uint64_t num_rows = 80000;
+  uint64_t batch_num_rows = 10000;
+  std::vector<hbase::Put> puts;
+  for (uint64_t i = 0; i < num_rows;) {
+    puts.clear();
+    // accumulate batch_num_rows at a time
+    for (uint64_t j = 0; j < batch_num_rows && i < num_rows; ++j) {
+      hbase::Put put("test" + std::to_string(i));
+      put.AddColumn("d", std::to_string(i), "value" + std::to_string(i));
+      puts.push_back(put);
+      i++;
+    }
+    PerformMultiPuts(puts, table);
+  }
+  table->Close();
+  client.Close();
+}
+
+TEST_F(ClientTest, MultiPutsWithRegionSplits) {
+  // Using TestUtil to populate test data
+  std::vector<std::string> keys{"test0",   "test100", "test200", "test300", "test400",
+                                "test500", "test600", "test700", "test800", "test900"};
+  std::string table_name = "t";
+  ClientTest::test_util->CreateTable(table_name, "d", keys);
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+
+  SetClientParams();
+
+  // Create a client
+  hbase::Client client(*ClientTest::test_util->conf());
+  std::shared_ptr<Table> table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  uint64_t num_rows = 80000;
+  uint64_t batch_num_rows = 10000;
+  std::vector<hbase::Put> puts;
+  for (uint64_t i = 0; i < num_rows;) {
+    puts.clear();
+    // accumulate batch_num_rows at a time
+    for (uint64_t j = 0; j < batch_num_rows && i < num_rows; ++j) {
+      hbase::Put put("test" + std::to_string(i));
+      put.AddColumn("d", std::to_string(i), "value" + std::to_string(i));
+      puts.push_back(put);
+      i++;
+    }
+    PerformMultiPuts(puts, table);
+  }
+  table->Close();
+  client.Close();
+}
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
index 53ab526..409883f 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -197,18 +197,26 @@ folly::Future<std::shared_ptr<Result>> RawAsyncTable::Append(const hbase::Append
 
   return caller->Call().then([caller](const auto r) { return r; });
 }
+
 folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Get(
     const std::vector<hbase::Get>& gets) {
-  return this->Batch(gets);
+  std::vector<std::shared_ptr<hbase::Row>> rows;
+  for (auto get : gets) {
+    std::shared_ptr<hbase::Row> srow = std::make_shared<hbase::Get>(get);
+    rows.push_back(srow);
+  }
+  return this->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(
+      rows, connection_conf_->read_rpc_timeout());
 }
 
-folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Batch(
-    const std::vector<hbase::Get>& gets) {
+template <typename REQ, typename RESP>
+folly::Future<std::vector<folly::Try<RESP>>> RawAsyncTable::Batch(
+    const std::vector<REQ>& rows, std::chrono::nanoseconds timeout) {
   auto caller = connection_->caller_factory()
-                    ->Batch()
+                    ->Batch<REQ, RESP>()
                     ->table(table_name_)
-                    ->actions(std::make_shared<std::vector<hbase::Get>>(gets))
-                    ->rpc_timeout(connection_conf_->read_rpc_timeout())
+                    ->actions(std::make_shared<std::vector<REQ>>(rows))
+                    ->rpc_timeout(timeout)
                     ->operation_timeout(connection_conf_->operation_timeout())
                     ->pause(connection_conf_->pause())
                     ->max_attempts(connection_conf_->max_retries())
@@ -237,4 +245,15 @@ std::shared_ptr<hbase::Scan> RawAsyncTable::SetDefaultScanConfig(const hbase::Sc
   }
   return new_scan;
 }
+
+folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Put(
+    const std::vector<hbase::Put>& puts) {
+  std::vector<std::shared_ptr<hbase::Row>> rows;
+  for (auto put : puts) {
+    std::shared_ptr<hbase::Row> srow = std::make_shared<hbase::Put>(put);
+    rows.push_back(srow);
+  }
+  return this->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(
+      rows, connection_conf_->write_rpc_timeout());
+}
 }  // namespace hbase
diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h
index e651f8a..97eef7f 100644
--- a/hbase-native-client/core/raw-async-table.h
+++ b/hbase-native-client/core/raw-async-table.h
@@ -83,8 +83,11 @@ class RawAsyncTable {
 
   folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Get(
       const std::vector<hbase::Get>& gets);
-  folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Batch(
-      const std::vector<hbase::Get>& gets);
+  template <typename REQ, typename RESP>
+  folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ>& rows,
+                                                     std::chrono::nanoseconds timeout);
+  folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Put(
+      const std::vector<hbase::Put>& puts);
 
  private:
   /* Data */
diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc
index 47c09d1..f48f228 100644
--- a/hbase-native-client/core/request-converter.cc
+++ b/hbase-native-client/core/request-converter.cc
@@ -173,14 +173,23 @@ std::unique_ptr<Request> RequestConverter::ToMultiRequest(
       auto pb_action = pb_region_action->add_action();
       auto pget = region_action->action();
       // We store only hbase::Get in hbase::Action as of now. It will be changed later on.
-      CHECK(pget) << "Unexpected. action can't be null";
-      auto pb_get = RequestConverter::ToGet(*pget);
-      pb_action->set_allocated_get(pb_get.release());
+      CHECK(pget) << "Unexpected. action can't be null.";
+      std::string error_msg("");
+      if (typeid(*pget) == typeid(hbase::Get)) {
+        auto getp = dynamic_cast<hbase::Get *>(pget.get());
+        pb_action->set_allocated_get(RequestConverter::ToGet(*getp).release());
+      } else if (typeid(*pget) == typeid(hbase::Put)) {
+        auto putp = dynamic_cast<hbase::Put *>(pget.get());
+        pb_action->set_allocated_mutation(
+            RequestConverter::ToMutation(MutationType::MutationProto_MutationType_PUT, *putp, -1)
+                .release());
+      } else {
+        throw std::runtime_error("Unexpected action type encountered.");
+      }
       pb_action->set_index(action_num);
       action_num++;
     }
   }
-
   return pb_req;
 }
 
@@ -355,4 +364,5 @@ std::unique_ptr<Request> RequestConverter::AppendToMutateRequest(const Append &a
   VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString();
   return pb_req;
 }
+
 } /* namespace hbase */
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index 3b7a87b..f93a029 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -128,4 +128,17 @@ std::vector<std::shared_ptr<hbase::Result>> Table::Get(const std::vector<hbase::
   return results;
 }
 
+void Table::Put(const std::vector<hbase::Put> &puts) {
+  auto tresults = async_table_->Put(puts).get(operation_timeout());
+  uint32_t num = 0;
+  for (auto tresult : tresults) {
+    if (tresult.hasException()) {
+      LOG(ERROR) << "Caught exception:- " << tresult.exception().what() << " for "
+                 << puts[num++].row();
+      throw tresult.exception();
+    }
+  }
+  return;
+}
+
 } /* namespace hbase */
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index cc37182..6340494 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -119,11 +119,15 @@ class Table {
    * @param - append Append object to perform HBase Append operation.
    */
   std::shared_ptr<hbase::Result> Append(const hbase::Append &append);
-  // TODO: Batch Puts
 
   std::shared_ptr<ResultScanner> Scan(const hbase::Scan &scan);
 
   /**
+     * @brief - Multi Puts.
+     * @param - puts vector of hbase::Put.
+     */
+  void Put(const std::vector<hbase::Put> &puts);
+  /**
    * @brief - Close the client connection.
    */
   void Close();


Mime
View raw message