hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject [2/2] hbase git commit: HBASE-18061 [C++] Fix retry logic in multi-get calls (Sudeep Sunthankar)
Date Wed, 19 Jul 2017 18:58:59 GMT
HBASE-18061 [C++] Fix retry logic in multi-get calls (Sudeep Sunthankar)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a93c6a99
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a93c6a99
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a93c6a99

Branch: refs/heads/HBASE-14850
Commit: a93c6a99857dcc69a4c39e9603b54dbac271baa9
Parents: 9effc92
Author: Enis Soztutar <enis@apache.org>
Authored: Wed Jul 19 11:50:40 2017 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Wed Jul 19 11:50:44 2017 -0700

----------------------------------------------------------------------
 hbase-native-client/core/BUCK                   |  11 +
 .../core/async-batch-rpc-retrying-caller.cc     | 241 +++++-----
 .../core/async-batch-rpc-retrying-caller.h      |  28 +-
 .../core/async-batch-rpc-retrying-test.cc       | 463 +++++++++++++++++++
 hbase-native-client/core/client-test.cc         |  91 +++-
 hbase-native-client/core/multi-response.cc      |  34 +-
 hbase-native-client/core/multi-response.h       |  14 +-
 hbase-native-client/core/region-result.cc       |   2 +-
 hbase-native-client/core/region-result.h        |   5 +-
 hbase-native-client/core/response-converter.cc  |  50 +-
 hbase-native-client/core/response-converter.h   |   8 +-
 hbase-native-client/core/server-request.h       |  10 +-
 12 files changed, 753 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 464c010..f9db0bd 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -323,6 +323,17 @@ cxx_test(
         ":core",
     ],
     run_test_separately=True,)
+cxx_test(
+    name="multi-retry-test",
+    srcs=[
+        "async-batch-rpc-retrying-test.cc",
+    ],
+    deps=[
+        ":core",
+        "//test-util:test-util",
+        "//exceptions:exceptions",
+    ],
+    run_test_separately=True,)
 cxx_binary(
     name="simple-client",
     srcs=[

http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
index 05290f5..0d67b17 100644
--- a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc
@@ -77,20 +77,20 @@ int64_t AsyncBatchRpcRetryingCaller::RemainingTimeNs() {
 
 void AsyncBatchRpcRetryingCaller::LogException(int32_t tries,
                                                std::shared_ptr<RegionRequest> region_request,
-                                               std::shared_ptr<std::exception> &error,
+                                               const folly::exception_wrapper &ew,
                                                std::shared_ptr<ServerName> server_name) {
   if (tries > start_log_errors_count_) {
     std::string regions;
     regions += region_request->region_location()->region_name() + ", ";
     LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
                  << table_name_->qualifier() << " from " << server_name->host_name()
-                 << " failed, tries=" << tries << ":- " << error->what();
+                 << " failed, tries=" << tries << ":- " << ew.what().toStdString();
   }
 }
 
 void AsyncBatchRpcRetryingCaller::LogException(
-    int32_t tries, std::vector<std::shared_ptr<RegionRequest>> &region_requests,
-    std::shared_ptr<std::exception> &error, std::shared_ptr<ServerName> server_name) {
+    int32_t tries, const std::vector<std::shared_ptr<RegionRequest>> &region_requests,
+    const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
   if (tries > start_log_errors_count_) {
     std::string regions;
     for (const auto region_request : region_requests) {
@@ -98,7 +98,7 @@ void AsyncBatchRpcRetryingCaller::LogException(
     }
     LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
                  << table_name_->qualifier() << " from " << server_name->host_name()
-                 << " failed, tries=" << tries << error->what();
+                 << " failed, tries=" << tries << ew.what().toStdString();
   }
 }
 
@@ -107,27 +107,24 @@ const std::string AsyncBatchRpcRetryingCaller::GetExtraContextForError(
   return server_name ? server_name->ShortDebugString() : "";
 }
 
-// TODO HBASE-17800 pass folly ew instead of std::exception
 void AsyncBatchRpcRetryingCaller::AddError(const std::shared_ptr<Action> &action,
-                                           std::shared_ptr<std::exception> error,
+                                           const folly::exception_wrapper &ew,
                                            std::shared_ptr<ServerName> server_name) {
-  folly::exception_wrapper ew;
   ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
   AddAction2Error(action->original_index(), twec);
 }
 
 void AsyncBatchRpcRetryingCaller::AddError(const std::vector<std::shared_ptr<Action>> &actions,
-                                           std::shared_ptr<std::exception> error,
+                                           const folly::exception_wrapper &ew,
                                            std::shared_ptr<ServerName> server_name) {
   for (const auto action : actions) {
-    AddError(action, error, server_name);
+    AddError(action, ew, server_name);
   }
 }
 
-// TODO HBASE-17800 pass folly ew instead of std::exception
 void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action, int32_t tries,
-                                          std::shared_ptr<std::exception> error,
-                                          int64_t current_time, const std::string extras) {
+                                          const folly::exception_wrapper &ew, int64_t current_time,
+                                          const std::string extras) {
   auto action_index = action->original_index();
   auto itr = action2promises_.find(action_index);
   if (itr != action2promises_.end()) {
@@ -135,7 +132,6 @@ void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action,
       return;
     }
   }
-  folly::exception_wrapper ew;
   ThrowableWithExtraContext twec(ew, current_time, extras);
   AddAction2Error(action_index, twec);
   action2promises_[action_index].setException(
@@ -143,10 +139,10 @@ void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action,
 }
 
 void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions,
-                                          int32_t tries, std::shared_ptr<std::exception> error,
+                                          int32_t tries, const folly::exception_wrapper &ew,
                                           std::shared_ptr<ServerName> server_name) {
   for (const auto action : actions) {
-    FailOne(action, tries, error, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+    FailOne(action, tries, ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
   }
 }
 
@@ -159,7 +155,7 @@ void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Acti
       return;
     }
     action2promises_[action_index].setException(
-        RetriesExhaustedException(tries - 1, action2errors_[action_index]));
+        RetriesExhaustedException(tries, action2errors_[action_index]));
   }
 }
 
@@ -176,34 +172,32 @@ void AsyncBatchRpcRetryingCaller::AddAction2Error(uint64_t action_index,
 }
 
 void AsyncBatchRpcRetryingCaller::OnError(const ActionsByRegion &actions_by_region, int32_t tries,
-                                          std::shared_ptr<std::exception> exc,
+                                          const folly::exception_wrapper &ew,
                                           std::shared_ptr<ServerName> server_name) {
   std::vector<std::shared_ptr<Action>> copied_actions;
   std::vector<std::shared_ptr<RegionRequest>> region_requests;
   for (const auto &action_by_region : actions_by_region) {
     region_requests.push_back(action_by_region.second);
-    // Concurrent
     for (const auto &action : action_by_region.second->actions()) {
       copied_actions.push_back(action);
     }
   }
-  // TODO HBASE-17800 for exc check with DoNotRetryIOException
-  LogException(tries, region_requests, exc, server_name);
-  if (tries >= max_attempts_) {
-    FailAll(copied_actions, tries, exc, server_name);
+
+  LogException(tries, region_requests, ew, server_name);
+  if ((tries >= max_attempts_) || !ExceptionUtil::ShouldRetry(ew)) {
+    FailAll(copied_actions, tries, ew, server_name);
     return;
   }
-  AddError(copied_actions, exc, server_name);
+  AddError(copied_actions, ew, server_name);
   TryResubmit(copied_actions, tries);
 }
 
-void AsyncBatchRpcRetryingCaller::TryResubmit(std::vector<std::shared_ptr<Action>> actions,
+void AsyncBatchRpcRetryingCaller::TryResubmit(const std::vector<std::shared_ptr<Action>> &actions,
                                               int32_t tries) {
   int64_t delay_ns;
   if (operation_timeout_ns_.count() > 0) {
     int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
     if (max_delay_ns <= 0) {
-      VLOG(8) << "Fail All from onError";
       FailAll(actions, tries);
       return;
     }
@@ -211,9 +205,12 @@ void AsyncBatchRpcRetryingCaller::TryResubmit(std::vector<std::shared_ptr<Action
   } else {
     delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1);
   }
-  // TODO This gives segfault @ present, when retried
-  //    retry_timer_->scheduleTimeoutFn([&]() { GroupAndSend(actions, tries + 1); },
-  //                                  milliseconds(TimeUtil::ToMillis(delay_ns)));
+
+  conn_->retry_executor()->add([=]() {
+    retry_timer_->scheduleTimeoutFn(
+        [=]() { conn_->cpu_executor()->add([=]() { GroupAndSend(actions, tries + 1); }); },
+        milliseconds(TimeUtil::ToMillis(delay_ns)));
+  });
 }
 
 Future<std::vector<Try<std::shared_ptr<RegionLocation>>>>
@@ -234,7 +231,7 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr
   if (operation_timeout_ns_.count() > 0) {
     locate_timeout_ns = RemainingTimeNs();
     if (locate_timeout_ns <= 0) {
-      FailAll(actions_, tries);
+      FailAll(actions, tries);
       return;
     }
   } else {
@@ -242,8 +239,8 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr
   }
 
   GetRegionLocations(actions, locate_timeout_ns)
-      .then([&](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) {
-        std::lock_guard<std::mutex> lock(multi_mutex_);
+      .then([=](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) {
+        std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
         ActionsByServer actions_by_server;
         std::vector<std::shared_ptr<Action>> locate_failed;
 
@@ -252,50 +249,36 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr
           if (loc[i].hasValue()) {
             auto region_loc = loc[i].value();
             // Add it to actions_by_server;
-            // Concurrent
             auto search =
                 actions_by_server.find(std::make_shared<ServerName>(region_loc->server_name()));
             if (search != actions_by_server.end()) {
               search->second->AddActionsByRegion(region_loc, action);
             } else {
-              // Create new key
               auto server_request = std::make_shared<ServerRequest>(region_loc);
               server_request->AddActionsByRegion(region_loc, action);
               auto server_name = std::make_shared<ServerName>(region_loc->server_name());
               actions_by_server[server_name] = server_request;
             }
-            locate_failed.push_back(action);
-            VLOG(8) << "row [" << action->action()->row() << "] of table["
-                    << table_name_->namespace_() << ":" << table_name_->qualifier()
-                    << " found in region [" << region_loc->region_name() << "]; host["
-                    << region_loc->server_name().host_name() << "]; port["
+            VLOG(5) << "rowkey [" << action->action()->row() << "] of table["
+                    << table_name_->ShortDebugString() << "] found in ["
+                    << region_loc->region_name() << "]; RS["
+                    << region_loc->server_name().host_name() << ":"
                     << region_loc->server_name().port() << "];";
           } else if (loc[i].hasException()) {
-            VLOG(8) << "Exception occured while locating region:- "
-                    << loc[i].exception().getCopied()->what() << " for action index " << i;
-            // TODO Feedback needed, Java API only identifies DoNotRetryIOException
-            // We might receive runtime error from location-cache.cc too, we are treating both same
-            if (loc[i].exception().is_compatible_with<std::runtime_error>()) {
-              std::string extra = "";
-              FailOne(action, tries, nullptr, TimeUtil::GetNowNanos(),
-                      loc[i].exception().what().toStdString());
-              return;
+            folly::exception_wrapper ew = loc[i].exception();
+            VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
+                    << "for index:" << i << "; tries: " << tries
+                    << "; max_attempts_: " << max_attempts_;
+            // We might receive runtime error from location-cache.cc too, we are doing FailOne and
+            // continue next one
+            if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
+              FailOne(action, tries, ew, TimeUtil::GetNowNanos(), ew.what().toStdString());
+            } else {
+              AddError(action, loc[i].exception(), nullptr);
+              locate_failed.push_back(action);
             }
-            // TODO HBASE-17800 for exc check with DoNotRetryIOException
-            /*
-             else if (loc[i].exception().is_compatible_with<hbase::DoNotRetryIOException>()) {
-             int64_t current_time = 0;
-             std::string extra = "";
-             FailOne(action, tries, nullptr, TimeUtil::GetNowNanos(),
-             loc[i].exception().what().toStdString());
-             return;
-             }*/
-            AddError(action, std::make_shared<std::exception>(*loc[i].exception().getCopied()),
-                     nullptr);
-            locate_failed.push_back(action);
           }
         }
-
         if (!actions_by_server.empty()) {
           Send(actions_by_server, tries);
         }
@@ -304,17 +287,21 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr
           TryResubmit(locate_failed, tries);
         }
       })
-      .onError([&](const folly::exception_wrapper &ew) {
-        std::lock_guard<std::mutex> lock(multi_mutex_);
-        auto exc = ew.getCopied();
-        VLOG(8) << "GetRegionLocations() exception: " << ew.what().toStdString();
+      .onError([=](const folly::exception_wrapper &ew) {
+        VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
+                << "tries: " << tries << "; max_attempts_: " << max_attempts_;
+        std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
+        if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
+          FailAll(actions, tries, ew, nullptr);
+        } else {
+          TryResubmit(actions, tries);
+        }
       });
   return;
 }
 
 Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller::GetMultiResponse(
     const ActionsByServer &actions_by_server) {
-  // Concurrent.
   auto multi_calls = std::vector<Future<std::unique_ptr<hbase::Response>>>{};
   auto user = User::defaultUser();
   for (const auto &action_by_server : actions_by_server) {
@@ -328,16 +315,14 @@ Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller:
   return collectAll(multi_calls);
 }
 
-void AsyncBatchRpcRetryingCaller::Send(ActionsByServer &actions_by_server, int32_t tries) {
+void AsyncBatchRpcRetryingCaller::Send(const ActionsByServer &actions_by_server, int32_t tries) {
   int64_t remaining_ns;
   if (operation_timeout_ns_.count() > 0) {
     remaining_ns = RemainingTimeNs();
     if (remaining_ns <= 0) {
       std::vector<std::shared_ptr<Action>> failed_actions;
       for (const auto &action_by_server : actions_by_server) {
-        // Concurrent
         for (auto &value : action_by_server.second->actions_by_region()) {
-          // Concurrent
           for (const auto &failed_action : value.second->actions()) {
             failed_actions.push_back(failed_action);
           }
@@ -357,30 +342,30 @@ void AsyncBatchRpcRetryingCaller::Send(ActionsByServer &actions_by_server, int32
 
   GetMultiResponse(actions_by_server)
       .then([=](const std::vector<Try<std::unique_ptr<hbase::Response>>> &completed_responses) {
-        std::lock_guard<std::mutex> lock(multi_mutex_);
-        for (uint64_t num = 0; num < completed_responses.size(); ++num) {
+        std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
+        uint64_t num = 0;
+        for (const auto &action_by_server : actions_by_server) {
           if (completed_responses[num].hasValue()) {
             auto multi_response =
-                ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value());
-            for (const auto &action_by_server : actions_by_server) {
-              OnComplete(action_by_server.second->actions_by_region(), tries,
-                         action_by_server.first, std::move(multi_response));
-            }
+                ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value(),
+                                              action_by_server.second->actions_by_region());
+            OnComplete(action_by_server.second->actions_by_region(), tries, action_by_server.first,
+                       std::move(multi_response));
           } else if (completed_responses[num].hasException()) {
-            VLOG(8) << "Received exception: "
-                    << completed_responses[num].exception().getCopied()->what()
-                    << " from server for action index " << num;
-            // TODO: we should call OnError here as well.
+            folly::exception_wrapper ew = completed_responses[num].exception();
+            VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString()
+                    << " from server for action index:" << num;
+            OnError(action_by_server.second->actions_by_region(), tries, ew,
+                    action_by_server.first);
           }
+          num++;
         }
       })
       .onError([=](const folly::exception_wrapper &ew) {
-        auto exc = ew.getCopied();
-        VLOG(8) << "GetMultiResponse() exception: " << ew.what().toStdString();
-        std::lock_guard<std::mutex> lock(multi_mutex_);
+        VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString();
+        std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
         for (const auto &action_by_server : actions_by_server) {
-          OnError(action_by_server.second->actions_by_region(), tries,
-                  std::make_shared<std::exception>(*exc), action_by_server.first);
+          OnError(action_by_server.second->actions_by_region(), tries, ew, action_by_server.first);
         }
       });
   return;
@@ -391,46 +376,35 @@ void AsyncBatchRpcRetryingCaller::OnComplete(
     const std::shared_ptr<ServerName> server_name,
     const std::unique_ptr<hbase::MultiResponse> multi_response) {
   std::vector<std::shared_ptr<Action>> failed_actions;
+  const auto region_results = multi_response->RegionResults();
   for (const auto &action_by_region : actions_by_region) {
-    auto region_result_itr = multi_response->RegionResults().find(action_by_region.first);
-    if (region_result_itr == multi_response->RegionResults().end()) {
-      VLOG(8) << "Region " << action_by_region.first << " not found in MultiResults.";
-      // TODO Feedback needed Should we throw from here or continue for next action_by_region ?
-      // Throwing at present as this looks like an inconsistency
-      // Concurrent
-      auto exc = std::make_shared<std::runtime_error>("Invalid search for region " +
-                                                      action_by_region.first + " in multi results");
-      FailAll(action_by_region.second->actions(), tries, exc, server_name);
-      return;
-      // std::runtime_error(
-      //  "Invalid search for region " + action_by_region.first + " in multi results");
-    }
-    if (region_result_itr != multi_response->RegionResults().end()) {
-      // Concurrent
+    auto region_result_itr = region_results.find(action_by_region.first);
+    if (region_result_itr != region_results.end()) {
       for (const auto &action : action_by_region.second->actions()) {
         OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second,
                    failed_actions);
       }
-    } else {
+    } else if (region_result_itr == region_results.end()) {
       auto region_exc = multi_response->RegionException(action_by_region.first);
-      std::shared_ptr<std::exception> pexc;
       if (region_exc == nullptr) {
-        VLOG(8) << "Server sent us neither results nor exceptions for " << action_by_region.first;
-        pexc = std::make_shared<std::exception>(std::runtime_error("Invalid response"));
-        // TODO: raise this exception to the application
+        // FailAll actions for this particular region as inconsistent server response. So we raise
+        // this exception to the application
+        std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() +
+                              " sent us neither results nor exceptions for " +
+                              action_by_region.first;
+        VLOG(1) << err_msg;
+        auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg);
+        FailAll(action_by_region.second->actions(), tries, ew, server_name);
       } else {
-        // TODO HBASE-17800 for exc check with DoNotRetryIOException
-        LogException(tries, action_by_region.second, region_exc, server_name);
-        location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(),
-                                              *region_exc);
-        std::string row_name;
-        if (tries >= max_attempts_) {
-          // Concurrent
-          FailAll(action_by_region.second->actions(), tries, region_exc, server_name);
+        // Eg: org.apache.hadoop.hbase.NotServingRegionException:
+        LogException(tries, action_by_region.second, *region_exc, server_name);
+        if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*region_exc)) {
+          FailAll(action_by_region.second->actions(), tries, *region_exc, server_name);
           return;
         }
-        // Concurrent
-        AddError(action_by_region.second->actions(), region_exc, server_name);
+        location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(),
+                                              *region_exc);
+        AddError(action_by_region.second->actions(), *region_exc, server_name);
         for (const auto &action : action_by_region.second->actions()) {
           failed_actions.push_back(action);
         }
@@ -440,6 +414,7 @@ void AsyncBatchRpcRetryingCaller::OnComplete(
   if (!failed_actions.empty()) {
     TryResubmit(failed_actions, tries);
   }
+
   return;
 }
 
@@ -454,34 +429,36 @@ void AsyncBatchRpcRetryingCaller::OnComplete(const std::shared_ptr<Action> &acti
     auto result_or_exc = region_result->ResultOrException(action->original_index());
     auto result = std::get<0>(*result_or_exc);
     auto exc = std::get<1>(*result_or_exc);
-    std::shared_ptr<std::exception> pexc;
     if (exc != nullptr) {
-      LogException(tries, region_request, exc, server_name);
-      if (tries >= max_attempts_) {
-        FailOne(action, tries, exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+      LogException(tries, region_request, *exc, server_name);
+      if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*exc)) {
+        FailOne(action, tries, *exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
       } else {
         failed_actions.push_back(action);
       }
     } else if (result != nullptr) {
       action2promises_[action->original_index()].setValue(std::move(result));
     } else {
-      VLOG(8) << "Server " << server_name->ShortDebugString()
-              << " sent us neither results nor exceptions for request @ index "
-              << action->original_index() << ", row " << action->action()->row() << " of "
-              << region_request->region_location()->region_name();
-      err_msg = "Invalid response";
-      AddError(action, std::make_shared<std::runtime_error>(err_msg), server_name);
+      std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() +
+                            " sent us neither results nor exceptions for request @ index " +
+                            std::to_string(action->original_index()) + ", row " +
+                            action->action()->row() + " of " +
+                            region_request->region_location()->region_name();
+      VLOG(1) << err_msg;
+      auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg);
+      AddError(action, ew, server_name);
       failed_actions.push_back(action);
     }
   } catch (const std::out_of_range &oor) {
-    // TODO Feedback needed. Should we retry for he specific index again ?
-    // This should never occur, so we are throwing a std::runtime_error from here
-    VLOG(8) << "No ResultOrException found @ index " << action->original_index() << ", row "
-            << action->action()->row() << " of "
-            << region_request->region_location()->region_name();
-    throw std::runtime_error("ResultOrException not present @ index " +
-                             std::to_string(action->original_index()));
+    // This should never occur. Error in logic. Throwing std::runtime_error from here. Will be
+    // retried or failed
+    std::string err_msg = "ResultOrException not present @ index " +
+                          std::to_string(action->original_index()) + ", row " +
+                          action->action()->row() + " of " +
+                          region_request->region_location()->region_name();
+    throw std::runtime_error(err_msg);
   }
   return;
 }
+
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/async-batch-rpc-retrying-caller.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
index 29a0e6a..4c04b91 100644
--- a/hbase-native-client/core/async-batch-rpc-retrying-caller.h
+++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.h
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <folly/ExceptionWrapper.h>
 #include <folly/Format.h>
 #include <folly/futures/Future.h>
 #include <folly/futures/Promise.h>
@@ -107,36 +108,36 @@ class AsyncBatchRpcRetryingCaller {
   int64_t RemainingTimeNs();
 
   void LogException(int32_t tries, std::shared_ptr<RegionRequest> region_request,
-                    std::shared_ptr<std::exception> &error,
+                    const folly::exception_wrapper &ew,
                     std::shared_ptr<pb::ServerName> server_name);
 
-  void LogException(int32_t tries, std::vector<std::shared_ptr<RegionRequest>> &region_requests,
-                    std::shared_ptr<std::exception> &error,
+  void LogException(int32_t tries,
+                    const std::vector<std::shared_ptr<RegionRequest>> &region_requests,
+                    const folly::exception_wrapper &ew,
                     std::shared_ptr<pb::ServerName> server_name);
 
   const std::string GetExtraContextForError(std::shared_ptr<pb::ServerName> server_name);
 
-  void AddError(const std::shared_ptr<Action> &action, std::shared_ptr<std::exception> error,
+  void AddError(const std::shared_ptr<Action> &action, const folly::exception_wrapper &ew,
                 std::shared_ptr<pb::ServerName> server_name);
 
   void AddError(const std::vector<std::shared_ptr<Action>> &actions,
-                std::shared_ptr<std::exception> error, std::shared_ptr<pb::ServerName> server_name);
+                const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name);
 
   void FailOne(const std::shared_ptr<Action> &action, int32_t tries,
-               std::shared_ptr<std::exception> error, int64_t current_time,
-               const std::string extras);
+               const folly::exception_wrapper &ew, int64_t current_time, const std::string extras);
 
   void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries,
-               std::shared_ptr<std::exception> error, std::shared_ptr<pb::ServerName> server_name);
+               const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name);
 
   void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
 
   void AddAction2Error(uint64_t action_index, const ThrowableWithExtraContext &twec);
 
   void OnError(const ActionsByRegion &actions_by_region, int32_t tries,
-               std::shared_ptr<std::exception> exc, std::shared_ptr<pb::ServerName> server_name);
+               const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name);
 
-  void TryResubmit(std::vector<std::shared_ptr<Action>> actions, int32_t tries);
+  void TryResubmit(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries);
 
   folly::Future<std::vector<folly::Try<std::shared_ptr<RegionLocation>>>> GetRegionLocations(
       const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns);
@@ -146,7 +147,7 @@ class AsyncBatchRpcRetryingCaller {
   folly::Future<std::vector<folly::Try<std::unique_ptr<Response>>>> GetMultiResponse(
       const ActionsByServer &actions_by_server);
 
-  void Send(ActionsByServer &actions_by_server, int32_t tries);
+  void Send(const ActionsByServer &actions_by_server, int32_t tries);
 
   void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries,
                   const std::shared_ptr<pb::ServerName> server_name,
@@ -179,7 +180,6 @@ class AsyncBatchRpcRetryingCaller {
   std::shared_ptr<RpcClient> rpc_client_ = nullptr;
   std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_pool_ = nullptr;
 
-  std::mutex multi_mutex_;
+  std::recursive_mutex multi_mutex_;
 };
-
-} /* namespace hbase */
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/async-batch-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-batch-rpc-retrying-test.cc b/hbase-native-client/core/async-batch-rpc-retrying-test.cc
new file mode 100644
index 0000000..c186276
--- /dev/null
+++ b/hbase-native-client/core/async-batch-rpc-retrying-test.cc
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Logging.h>
+#include <folly/Memory.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
+#include <gtest/gtest.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
+
+#include <chrono>
+#include <functional>
+#include <string>
+
+#include "connection/rpc-client.h"
+#include "core/async-batch-rpc-retrying-caller.h"
+#include "core/async-connection.h"
+#include "core/async-rpc-retrying-caller-factory.h"
+#include "core/client.h"
+#include "core/connection-configuration.h"
+#include "core/keyvalue-codec.h"
+#include "core/region-location.h"
+#include "core/result.h"
+#include "exceptions/exception.h"
+#include "test-util/test-util.h"
+#include "utils/time-util.h"
+
+using hbase::AsyncRpcRetryingCallerFactory;
+using hbase::AsyncConnection;
+using hbase::AsyncRegionLocator;
+using hbase::ConnectionConfiguration;
+using hbase::Configuration;
+using hbase::HBaseRpcController;
+using hbase::RegionLocation;
+using hbase::RegionLocateType;
+using hbase::RpcClient;
+using hbase::RequestConverter;
+using hbase::ResponseConverter;
+using hbase::Put;
+using hbase::TimeUtil;
+using hbase::Client;
+using hbase::security::User;
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+using namespace hbase;
+
+using folly::exception_wrapper;
+
+class AsyncBatchRpcRetryTest : public ::testing::Test {
+ public:
+  static std::unique_ptr<hbase::TestUtil> test_util;
+  static void SetUpTestCase() {
+    google::InstallFailureSignalHandler();
+    test_util = std::make_unique<hbase::TestUtil>();
+    test_util->StartMiniCluster(2);
+  }
+};
+std::unique_ptr<hbase::TestUtil> AsyncBatchRpcRetryTest::test_util = nullptr;
+
+class AsyncRegionLocatorBase : public AsyncRegionLocator {
+ public:
+  AsyncRegionLocatorBase() {}
+  explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
+      : region_location_(region_location) {}
+  virtual ~AsyncRegionLocatorBase() = default;
+
+  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
+                                                                     const std::string &row,
+                                                                     const RegionLocateType,
+                                                                     const int64_t) override {
+    folly::Promise<std::shared_ptr<RegionLocation>> promise;
+    promise.setValue(region_locations_.at(row));
+    return promise.getFuture();
+  }
+
+  virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
+    region_location_ = region_location;
+  }
+
+  virtual void set_region_location(
+      const std::map<std::string, std::shared_ptr<RegionLocation>> &reg_locs) {
+    for (auto reg_loc : reg_locs) {
+      region_locations_[reg_loc.first] = reg_loc.second;
+    }
+  }
+
+  void UpdateCachedLocation(const RegionLocation &rl, const folly::exception_wrapper &ew) override {
+  }
+
+ protected:
+  std::shared_ptr<RegionLocation> region_location_;
+  std::map<std::string, std::shared_ptr<RegionLocation>> region_locations_;
+  std::map<std::string, uint32_t> mtries_;
+  std::map<std::string, uint32_t> mnum_fails_;
+
+  void InitRetryMaps(uint32_t num_fails) {
+    if (mtries_.size() == 0 && mnum_fails_.size() == 0) {
+      for (auto reg_loc : region_locations_) {
+        mtries_[reg_loc.first] = 0;
+        mnum_fails_[reg_loc.first] = num_fails;
+      }
+    }
+  }
+};
+
+class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
+ public:
+  MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
+  explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : AsyncRegionLocatorBase(region_location) {}
+  virtual ~MockAsyncRegionLocator() {}
+};
+
+class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+  uint32_t counter_ = 0;
+  uint32_t num_fails_ = 0;
+  uint32_t tries_ = 0;
+
+ public:
+  explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
+      : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+  explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : AsyncRegionLocatorBase(region_location) {}
+  virtual ~MockWrongRegionAsyncRegionLocator() {}
+
+  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+      const hbase::pb::TableName &tn, const std::string &row,
+      const RegionLocateType locate_type = RegionLocateType::kCurrent,
+      const int64_t locate_ns = 0) override {
+    InitRetryMaps(num_fails_);
+    auto &tries = mtries_[row];
+    auto &num_fails = mnum_fails_[row];
+    if (++tries > num_fails) {
+      return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+    }
+
+    folly::Promise<std::shared_ptr<RegionLocation>> promise;
+    /* set random region name, simulating invalid region */
+    auto result = std::make_shared<RegionLocation>("whatever-region-name",
+                                                   region_locations_.at(row)->region_info(),
+                                                   region_locations_.at(row)->server_name());
+    promise.setValue(result);
+    return promise.getFuture();
+  }
+};
+
+class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
+ private:
+  uint32_t tries_ = 0;
+  uint32_t num_fails_ = 0;
+  uint32_t counter_ = 0;
+
+ public:
+  explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
+      : AsyncRegionLocatorBase(), num_fails_(num_fails) {}
+  explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
+      : AsyncRegionLocatorBase(region_location) {}
+  virtual ~MockFailingAsyncRegionLocator() {}
+  folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
+      const hbase::pb::TableName &tn, const std::string &row,
+      const RegionLocateType locate_type = RegionLocateType::kCurrent,
+      const int64_t locate_ns = 0) override {
+    InitRetryMaps(num_fails_);
+    auto &tries = mtries_[row];
+    auto &num_fails = mnum_fails_[row];
+    if (++tries > num_fails) {
+      return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
+    }
+
+    folly::Promise<std::shared_ptr<RegionLocation>> promise;
+    promise.setException(std::runtime_error{"Failed to look up region location"});
+    return promise.getFuture();
+  }
+};
+
+class MockAsyncConnection : public AsyncConnection,
+                            public std::enable_shared_from_this<MockAsyncConnection> {
+ public:
+  MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
+                      std::shared_ptr<folly::HHWheelTimer> retry_timer,
+                      std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
+                      std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+                      std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
+                      std::shared_ptr<RpcClient> rpc_client,
+                      std::shared_ptr<AsyncRegionLocator> region_locator)
+      : conn_conf_(conn_conf),
+        retry_timer_(retry_timer),
+        cpu_executor_(cpu_executor),
+        io_executor_(io_executor),
+        retry_executor_(retry_executor),
+        rpc_client_(rpc_client),
+        region_locator_(region_locator) {}
+  ~MockAsyncConnection() {}
+  void Init() {
+    caller_factory_ =
+        std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
+  }
+
+  std::shared_ptr<Configuration> conf() override { return nullptr; }
+  std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
+  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
+    return caller_factory_;
+  }
+  std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
+  std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
+  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
+    return retry_executor_;
+  }
+
+  void Close() override {}
+  std::shared_ptr<HBaseRpcController> CreateRpcController() override {
+    return std::make_shared<HBaseRpcController>();
+  }
+
+ private:
+  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+  std::shared_ptr<ConnectionConfiguration> conn_conf_;
+  std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
+  std::shared_ptr<RpcClient> rpc_client_;
+  std::shared_ptr<AsyncRegionLocator> region_locator_;
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
+};
+
+class MockRawAsyncTableImpl {
+ public:
+  explicit MockRawAsyncTableImpl(std::shared_ptr<MockAsyncConnection> conn,
+                                 std::shared_ptr<hbase::pb::TableName> tn)
+      : conn_(conn), tn_(tn) {}
+  virtual ~MockRawAsyncTableImpl() = default;
+
+  /* implement this in real RawAsyncTableImpl. */
+  folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Gets(
+      const std::vector<hbase::Get> &gets) {
+    /* init request caller builder */
+    auto builder = conn_->caller_factory()->Batch();
+
+    /* call with retry to get result */
+    auto async_caller =
+        builder->table(tn_)
+            ->actions(std::make_shared<std::vector<hbase::Get>>(gets))
+            ->rpc_timeout(conn_->connection_conf()->read_rpc_timeout())
+            ->operation_timeout(conn_->connection_conf()->operation_timeout())
+            ->pause(conn_->connection_conf()->pause())
+            ->max_attempts(conn_->connection_conf()->max_retries())
+            ->start_log_errors_count(conn_->connection_conf()->start_log_errors_count())
+            ->Build();
+
+    return async_caller->Call().then([async_caller](auto r) { return r; });
+  }
+
+ private:
+  std::shared_ptr<MockAsyncConnection> conn_;
+  std::shared_ptr<hbase::pb::TableName> tn_;
+};
+
+void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
+                  const std::string &table_name, bool split_regions, uint32_t tries = 3,
+                  uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 10000) {
+  std::vector<std::string> keys{"test0",   "test100", "test200", "test300", "test400",
+                                "test500", "test600", "test700", "test800", "test900"};
+  std::string tableName = (split_regions) ? ("split-" + table_name) : table_name;
+  if (split_regions)
+    AsyncBatchRpcRetryTest::test_util->CreateTable(tableName, "d", keys);
+  else
+    AsyncBatchRpcRetryTest::test_util->CreateTable(tableName, "d");
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>(tableName);
+
+  // Create a client
+  Client client(*AsyncBatchRpcRetryTest::test_util->conf());
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  for (uint64_t i = 0; i < num_rows; i++) {
+    table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
+                                                         "value" + std::to_string(i)));
+  }
+
+  std::map<std::string, std::shared_ptr<RegionLocation>> region_locations;
+  std::vector<hbase::Get> gets;
+  for (uint64_t i = 0; i < num_rows; ++i) {
+    auto row = "test" + std::to_string(i);
+    hbase::Get get(row);
+    gets.push_back(get);
+    region_locations[row] = table->GetRegionLocation(row);
+  }
+
+  /* init region location and rpc channel */
+  auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+  auto io_executor_ = client.async_connection()->io_executor();
+  auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+  auto codec = std::make_shared<hbase::KeyValueCodec>();
+  auto rpc_client =
+      std::make_shared<RpcClient>(io_executor_, codec, AsyncBatchRpcRetryTest::test_util->conf());
+  std::shared_ptr<folly::HHWheelTimer> retry_timer =
+      folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
+
+  /* init connection configuration */
+  auto connection_conf = std::make_shared<ConnectionConfiguration>(
+      TimeUtil::SecondsToNanos(20),                       // connect_timeout
+      TimeUtil::MillisToNanos(operation_timeout_millis),  // operation_timeout
+      TimeUtil::SecondsToNanos(60),                       // rpc_timeout
+      TimeUtil::MillisToNanos(100),                       // pause
+      tries,                                              // max retries
+      1);                                                 // start log errors count
+
+  /* set region locator */
+  region_locator->set_region_location(region_locations);
+
+  /* init hbase client connection */
+  auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
+                                                    io_executor_, retry_executor_, rpc_client,
+                                                    region_locator);
+  conn->Init();
+
+  /* init retry caller factory */
+  auto tableImpl =
+      std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
+
+  auto tresults = tableImpl->Gets(gets).get(milliseconds(operation_timeout_millis));
+
+  ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
+  std::vector<std::shared_ptr<hbase::Result>> results{};
+  uint32_t num = 0;
+  for (auto tresult : tresults) {
+    if (tresult.hasValue()) {
+      results.push_back(tresult.value());
+    } else if (tresult.hasException()) {
+      folly::exception_wrapper ew = tresult.exception();
+      LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for " << gets[num].row();
+      throw ew;
+    }
+    ++num;
+  }
+
+  // Test the values, should be same as in put executed on hbase shell
+  ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
+  uint32_t i = 0;
+  for (; i < num_rows; ++i) {
+    ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row()
+                                        << " must not be empty";
+    EXPECT_EQ("test" + std::to_string(i), results[i]->Row());
+    EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value());
+  }
+
+  retry_timer->destroy();
+  table->Close();
+  client.Close();
+  retry_executor_->stop();
+  io_executor_->stop();
+  cpu_executor_->stop();
+}
+
+// Test successful case
+TEST_F(AsyncBatchRpcRetryTest, MultiGets) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockAsyncRegionLocator>());
+  runMultiTest(region_locator, "table1", false);
+}
+
+// Tests the RPC failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, HandleException) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+  runMultiTest(region_locator, "table2", false, 5);
+}
+
+// Tests the RPC failing 4 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, FailWithException) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
+  EXPECT_ANY_THROW(runMultiTest(region_locator, "table3", false));
+}
+
+// Tests the region location lookup failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(3));
+  runMultiTest(region_locator, "table4", false);
+}
+
+// Tests the region location lookup failing 5 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(4));
+  EXPECT_ANY_THROW(runMultiTest(region_locator, "table5", false, 3));
+}
+
+// Tests hitting operation timeout, thus not retrying anymore
+TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(6));
+  EXPECT_ANY_THROW(runMultiTest(region_locator, "table6", false, 5, 100, 10000));
+}
+
+// Test successful case
+TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockAsyncRegionLocator>());
+  runMultiTest(region_locator, "table1", true);
+}
+
+// Tests the RPC failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
+  runMultiTest(region_locator, "table2", true, 5);
+}
+
+// Tests the RPC failing 4 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
+  EXPECT_ANY_THROW(runMultiTest(region_locator, "table3", true));
+}
+
+// Tests the region location lookup failing 3 times, then succeeding
+TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(3));
+  runMultiTest(region_locator, "table4", true);
+}
+
+// Tests the region location lookup failing 5 times, throwing an exception
+TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(4));
+  EXPECT_ANY_THROW(runMultiTest(region_locator, "table5", true, 3));
+}
+
+// Tests hitting operation timeout, thus not retrying anymore
+TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) {
+  std::shared_ptr<AsyncRegionLocatorBase> region_locator(
+      std::make_shared<MockFailingAsyncRegionLocator>(6));
+  EXPECT_ANY_THROW(runMultiTest(region_locator, "table6", true, 5, 100, 10000));
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/client-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index ba213bd..9efe0b6 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -435,29 +435,27 @@ TEST_F(ClientTest, PutsWithTimestamp) {
   client.Close();
 }
 
-TEST_F(ClientTest, MultiGets) {
-  // Using TestUtil to populate test data
-  ClientTest::test_util->CreateTable("t", "d");
-
-  // Create TableName and Row to be fetched from HBase
-  auto tn = folly::to<hbase::pb::TableName>("t");
-
-  // Create a client
-  hbase::Client client(*ClientTest::test_util->conf());
+void SetClientParams() {
+  ClientTest::test_util->conf()->SetInt("hbase.client.cpu.thread.pool.size", 6);
+  ClientTest::test_util->conf()->SetInt("hbase.client.operation.timeout", 600000);
+  ClientTest::test_util->conf()->SetInt("hbase.client.retries.number", 7);
+  ClientTest::test_util->conf()->SetInt("hbase.client.start.log.errors.counter", 1);
+}
 
-  // Get connection to HBase Table
-  auto table = client.Table(tn);
+void PerformPuts(uint64_t num_rows, std::shared_ptr<hbase::Client> client,
+                 const std::string &table_name) {
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+  auto table = client->Table(tn);
   ASSERT_TRUE(table) << "Unable to get connection to Table.";
-
-  uint64_t num_rows = 10000;
   // Perform Puts
   for (uint64_t i = 0; i < num_rows; i++) {
     table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
                                                          "value" + std::to_string(i)));
   }
+}
 
+void MakeGets(uint64_t num_rows, const std::string &row_prefix, std::vector<hbase::Get> &gets) {
   // Perform the Gets
-  std::vector<hbase::Get> gets;
   for (uint64_t i = 0; i < num_rows; ++i) {
     auto row = "test" + std::to_string(i);
     hbase::Get get(row);
@@ -465,9 +463,10 @@ TEST_F(ClientTest, MultiGets) {
   }
   gets.push_back(hbase::Get("test2"));
   gets.push_back(hbase::Get("testextra"));
+}
 
-  auto results = table->Get(gets);
-
+void TestMultiResults(uint64_t num_rows, const std::vector<std::shared_ptr<hbase::Result>> &results,
+                      const std::vector<hbase::Get> &gets) {
   // Test the values, should be same as in put executed on hbase shell
   ASSERT_TRUE(!results.empty()) << "Result vector shouldn't be empty.";
 
@@ -483,6 +482,66 @@ TEST_F(ClientTest, MultiGets) {
 
   ++i;
   ASSERT_TRUE(results[i]->IsEmpty()) << "Result for Get " << gets[i].row() << " must be empty";
+}
+
+TEST_F(ClientTest, MultiGets) {
+  std::string table_name = "t";
+  // Using TestUtil to populate test data
+  ClientTest::test_util->CreateTable(table_name, "d");
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+
+  SetClientParams();
+  // Create a client
+  hbase::Client client(*ClientTest::test_util->conf());
+
+  uint64_t num_rows = 50000;
+  PerformPuts(num_rows, std::make_shared<hbase::Client>(client), table_name);
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  std::vector<hbase::Get> gets;
+  MakeGets(num_rows, "test", gets);
+
+  auto results = table->Get(gets);
+
+  TestMultiResults(num_rows, results, gets);
+
+  table->Close();
+  client.Close();
+}
+
+TEST_F(ClientTest, MultiGetsWithRegionSplits) {
+  // Using TestUtil to populate test data
+  std::vector<std::string> keys{"test0",   "test100", "test200", "test300", "test400",
+                                "test500", "test600", "test700", "test800", "test900"};
+  std::string table_name = "t";
+  ClientTest::test_util->CreateTable(table_name, "d", keys);
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+
+  SetClientParams();
+
+  // Create a client
+  hbase::Client client(*ClientTest::test_util->conf());
+
+  uint64_t num_rows = 50000;
+  PerformPuts(num_rows, std::make_shared<hbase::Client>(client), table_name);
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  std::vector<hbase::Get> gets;
+  MakeGets(num_rows, "test", gets);
+
+  auto results = table->Get(gets);
+
+  TestMultiResults(num_rows, results, gets);
 
   table->Close();
   client.Close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/multi-response.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/multi-response.cc b/hbase-native-client/core/multi-response.cc
index f620c98..a4c2108 100644
--- a/hbase-native-client/core/multi-response.cc
+++ b/hbase-native-client/core/multi-response.cc
@@ -18,6 +18,7 @@
  */
 
 #include "core/multi-response.h"
+#include <glog/logging.h>
 #include "core/region-result.h"
 
 using hbase::pb::RegionLoadStats;
@@ -36,35 +37,38 @@ int MultiResponse::Size() const {
 
 void MultiResponse::AddRegionResult(const std::string& region_name, int32_t original_index,
                                     std::shared_ptr<Result> result,
-                                    std::shared_ptr<std::exception> exc) {
-  bool region_found = false;
-  for (auto itr = results_.begin(); itr != results_.end(); ++itr) {
-    if (itr->first == region_name) {
-      region_found = true;
-      itr->second->AddResultOrException(original_index, result, exc);
-      break;
-    }
-  }
-  if (!region_found) {
+                                    std::shared_ptr<folly::exception_wrapper> exc) {
+  auto itr = results_.find(region_name);
+  if (itr == results_.end()) {
     auto region_result = std::make_shared<RegionResult>();
     region_result->AddResultOrException(original_index, result, exc);
     results_[region_name] = region_result;
+  } else {
+    itr->second->AddResultOrException(original_index, result, exc);
   }
 }
 
 void MultiResponse::AddRegionException(const std::string& region_name,
-                                       std::shared_ptr<std::exception> exception) {
-  exceptions_[region_name] = exception;
+                                       std::shared_ptr<folly::exception_wrapper> exception) {
+  VLOG(8) << "Store Region Exception:- " << exception->what() << "; Region[" << region_name << "];";
+  bool region_found = false;
+  auto itr = exceptions_.find(region_name);
+  if (itr == exceptions_.end()) {
+    auto region_result = std::make_shared<folly::exception_wrapper>();
+    exceptions_[region_name] = exception;
+  } else {
+    itr->second = exception;
+  }
 }
 
-std::shared_ptr<std::exception> MultiResponse::RegionException(
+std::shared_ptr<folly::exception_wrapper> MultiResponse::RegionException(
     const std::string& region_name) const {
   auto find = exceptions_.at(region_name);
   return find;
 }
 
-const std::map<std::string, std::shared_ptr<std::exception> >& MultiResponse::RegionExceptions()
-    const {
+const std::map<std::string, std::shared_ptr<folly::exception_wrapper> >&
+MultiResponse::RegionExceptions() const {
   return exceptions_;
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/multi-response.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/multi-response.h b/hbase-native-client/core/multi-response.h
index 96883fd..d38cfd6 100644
--- a/hbase-native-client/core/multi-response.h
+++ b/hbase-native-client/core/multi-response.h
@@ -20,6 +20,7 @@
 #pragma once
 
 #include <core/region-result.h>
+#include <folly/ExceptionWrapper.h>
 #include <exception>
 #include <map>
 #include <memory>
@@ -46,17 +47,18 @@ class MultiResponse {
    * @param resOrEx the result or error; will be empty for successful Put and Delete actions.
    */
   void AddRegionResult(const std::string& region_name, int32_t original_index,
-                       std::shared_ptr<Result> result, std::shared_ptr<std::exception> exc);
+                       std::shared_ptr<Result> result,
+                       std::shared_ptr<folly::exception_wrapper> exc);
 
   void AddRegionException(const std::string& region_name,
-                          std::shared_ptr<std::exception> exception);
+                          std::shared_ptr<folly::exception_wrapper> exception);
 
   /**
    * @return the exception for the region, if any. Null otherwise.
    */
-  std::shared_ptr<std::exception> RegionException(const std::string& region_name) const;
+  std::shared_ptr<folly::exception_wrapper> RegionException(const std::string& region_name) const;
 
-  const std::map<std::string, std::shared_ptr<std::exception>>& RegionExceptions() const;
+  const std::map<std::string, std::shared_ptr<folly::exception_wrapper>>& RegionExceptions() const;
 
   void AddStatistic(const std::string& region_name, std::shared_ptr<pb::RegionLoadStats> stat);
 
@@ -66,12 +68,12 @@ class MultiResponse {
 
  private:
   // map of regionName to map of Results by the original index for that Result
-  std::map<std::string, std::shared_ptr<hbase::RegionResult>> results_;
+  std::map<std::string, std::shared_ptr<RegionResult>> results_;
   /**
    * The server can send us a failure for the region itself, instead of individual failure.
    * It's a part of the protobuf definition.
    */
-  std::map<std::string, std::shared_ptr<std::exception>> exceptions_;
+  std::map<std::string, std::shared_ptr<folly::exception_wrapper>> exceptions_;
 };
 
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/region-result.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-result.cc b/hbase-native-client/core/region-result.cc
index 05ab274..206c876 100644
--- a/hbase-native-client/core/region-result.cc
+++ b/hbase-native-client/core/region-result.cc
@@ -30,7 +30,7 @@ RegionResult::RegionResult() {}
 RegionResult::~RegionResult() {}
 
 void RegionResult::AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result,
-                                        std::shared_ptr<std::exception> exc) {
+                                        std::shared_ptr<folly::exception_wrapper> exc) {
   auto index_found = result_or_excption_.find(index);
   if (index_found == result_or_excption_.end()) {
     result_or_excption_[index] = std::make_tuple(result ? result : nullptr, exc ? exc : nullptr);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/region-result.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-result.h b/hbase-native-client/core/region-result.h
index cfd9e5a..b961634 100644
--- a/hbase-native-client/core/region-result.h
+++ b/hbase-native-client/core/region-result.h
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <folly/ExceptionWrapper.h>
 #include <map>
 #include <memory>
 #include <string>
@@ -29,13 +30,13 @@
 namespace hbase {
 
 using ResultOrExceptionTuple =
-    std::tuple<std::shared_ptr<hbase::Result>, std::shared_ptr<std::exception>>;
+    std::tuple<std::shared_ptr<hbase::Result>, std::shared_ptr<folly::exception_wrapper>>;
 
 class RegionResult {
  public:
   RegionResult();
   void AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result,
-                            std::shared_ptr<std::exception> exc);
+                            std::shared_ptr<folly::exception_wrapper> exc);
 
   void set_stat(std::shared_ptr<pb::RegionLoadStats> stat);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/response-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index 4f9bfb1..960c487 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -18,7 +18,6 @@
  */
 
 #include "core/response-converter.h"
-
 #include <glog/logging.h>
 #include <stdexcept>
 #include <string>
@@ -125,8 +124,9 @@ std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(
   return results;
 }
 
-std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ptr<Request> req,
-                                                                    const Response& resp) {
+std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(
+    std::shared_ptr<Request> req, const Response& resp,
+    const ServerRequest::ActionsByRegion& actions_by_region) {
   auto multi_req = std::static_pointer_cast<hbase::pb::MultiRequest>(req->req_msg());
   auto multi_resp = std::static_pointer_cast<hbase::pb::MultiResponse>(resp.resp_msg());
   VLOG(3) << "GetResults:" << multi_resp->ShortDebugString();
@@ -148,11 +148,10 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_
 
     auto region_name = rs.value();
     if (action_result.has_exception()) {
-      if (action_result.exception().has_value()) {
-        auto exc = std::make_shared<hbase::IOException>(action_result.exception().value());
-        VLOG(8) << "Store Region Exception:- " << exc->what();
-        multi_response->AddRegionException(region_name, exc);
-      }
+      auto ew = ResponseConverter::GetRemoteException(action_result.exception());
+      VLOG(8) << "Store Remote Region Exception:- " << ew->what().toStdString() << "; Region["
+              << region_name << "];";
+      multi_response->AddRegionException(region_name, ew);
       continue;
     }
 
@@ -163,14 +162,16 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_
                                " for region " + actions.region().value());
     }
 
+    auto multi_actions = actions_by_region.at(region_name)->actions();
+    uint64_t multi_actions_num = 0;
     for (hbase::pb::ResultOrException roe : action_result.resultorexception()) {
       std::shared_ptr<Result> result;
-      std::shared_ptr<std::exception> exc;
+      std::shared_ptr<folly::exception_wrapper> ew;
       if (roe.has_exception()) {
-        if (roe.exception().has_value()) {
-          exc = std::make_shared<hbase::IOException>(roe.exception().value());
-          VLOG(8) << "Store ResultOrException:- " << exc->what();
-        }
+        auto ew = ResponseConverter::GetRemoteException(roe.exception());
+        VLOG(8) << "Store Remote Region Exception:- " << ew->what().toStdString() << "; Region["
+                << region_name << "];";
+        multi_response->AddRegionException(region_name, ew);
       } else if (roe.has_result()) {
         result = ToResult(roe.result(), resp.cell_scanner());
       } else if (roe.has_service_result()) {
@@ -183,7 +184,11 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_
         result = std::make_shared<Result>(empty_cells, multi_resp->processed() ? true : false,
                                           false, false);
       }
-      multi_response->AddRegionResult(region_name, roe.index(), std::move(result), exc);
+      // We add the original index of the multi-action so that when populating the response back we
+      // do it as per the action index
+      multi_response->AddRegionResult(
+          region_name, multi_actions[multi_actions_num]->original_index(), std::move(result), ew);
+      multi_actions_num++;
     }
   }
 
@@ -196,4 +201,21 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_
   }
   return multi_response;
 }
+
+std::shared_ptr<folly::exception_wrapper> ResponseConverter::GetRemoteException(
+    const hbase::pb::NameBytesPair& exc_resp) {
+  std::string what;
+  std::string exception_class_name = exc_resp.has_name() ? exc_resp.name() : "";
+  std::string stack_trace = exc_resp.has_value() ? exc_resp.value() : "";
+
+  what.append(exception_class_name).append(stack_trace);
+  auto remote_exception = std::make_unique<RemoteException>(what);
+  remote_exception->set_exception_class_name(exception_class_name)
+      ->set_stack_trace(stack_trace)
+      ->set_hostname("")
+      ->set_port(0);
+
+  return std::make_shared<folly::exception_wrapper>(
+      folly::make_exception_wrapper<RemoteException>(*remote_exception));
+}
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/response-converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h
index 2f8f279..edd4165 100644
--- a/hbase-native-client/core/response-converter.h
+++ b/hbase-native-client/core/response-converter.h
@@ -25,6 +25,7 @@
 #include "connection/response.h"
 #include "core/multi-response.h"
 #include "core/result.h"
+#include "core/server-request.h"
 #include "if/Client.pb.h"
 #include "serde/cell-scanner.h"
 
@@ -56,12 +57,15 @@ class ResponseConverter {
   static std::vector<std::shared_ptr<Result>> FromScanResponse(
       const std::shared_ptr<pb::ScanResponse> resp, std::shared_ptr<CellScanner> cell_scanner);
 
-  static std::unique_ptr<hbase::MultiResponse> GetResults(std::shared_ptr<Request> req,
-                                                          const Response& resp);
+  static std::unique_ptr<hbase::MultiResponse> GetResults(
+      std::shared_ptr<Request> req, const Response& resp,
+      const ServerRequest::ActionsByRegion& actions_by_region);
 
  private:
   // Constructor not required. We have all static methods to extract response from PB messages.
   ResponseConverter();
+  static std::shared_ptr<folly::exception_wrapper> GetRemoteException(
+      const hbase::pb::NameBytesPair& exc_resp);
 };
 
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/server-request.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/server-request.h b/hbase-native-client/core/server-request.h
index 7f31c2b..85df9ed 100644
--- a/hbase-native-client/core/server-request.h
+++ b/hbase-native-client/core/server-request.h
@@ -44,8 +44,14 @@ class ServerRequest {
   void AddActionsByRegion(std::shared_ptr<RegionLocation> region_location,
                           std::shared_ptr<Action> action) {
     auto region_name = region_location->region_name();
-    auto itr = actions_by_region_.at(region_name);
-    itr->AddAction(action);
+    auto search = actions_by_region_.find(region_name);
+    if (search == actions_by_region_.end()) {
+      auto region_request = std::make_shared<RegionRequest>(region_location);
+      actions_by_region_[region_name] = region_request;
+      actions_by_region_[region_name]->AddAction(action);
+    } else {
+      search->second->AddAction(action);
+    }
   }
 
   const ActionsByRegion &actions_by_region() const { return actions_by_region_; }


Mime
View raw message