hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zg...@apache.org
Subject [hbase] 111/133: HBASE-18078. [C++] Harden RPC by handling various communication abnormalities
Date Tue, 12 Mar 2019 12:46:39 GMT
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 267fa0d218b27349337f111d9cba064158f01c80
Author: Xiaobing Zhou <xzhou@hortonworks.com>
AuthorDate: Mon Aug 7 17:44:59 2017 -0700

    HBASE-18078. [C++] Harden RPC by handling various communication abnormalities
    
    Signed-off-by: Enis Soztutar <enis@apache.org>
---
 .../connection/client-dispatcher.cc                | 16 ++++-
 hbase-native-client/connection/client-handler.cc   | 14 ++--
 .../connection/connection-factory.cc               | 29 +++++---
 hbase-native-client/connection/rpc-client.cc       | 49 ++++++++++++-
 hbase-native-client/connection/rpc-client.h        |  8 +++
 .../connection/rpc-test-server-handler.cc          |  3 +
 hbase-native-client/connection/rpc-test-server.cc  |  3 +
 hbase-native-client/connection/rpc-test.cc         | 84 +++++++++++++++++-----
 hbase-native-client/exceptions/exception.h         | 16 ++++-
 hbase-native-client/if/test_rpc_service.proto      |  1 +
 10 files changed, 186 insertions(+), 37 deletions(-)

diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc
index b9b2c34..d5d7f5f 100644
--- a/hbase-native-client/connection/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -18,8 +18,10 @@
  */
 #include "connection/client-dispatcher.h"
 #include <folly/ExceptionWrapper.h>
-
+#include <folly/Format.h>
+#include <folly/io/async/AsyncSocketException.h>
 #include <utility>
+#include "exceptions/exception.h"
 
 using std::unique_ptr;
 
@@ -31,6 +33,9 @@ void ClientDispatcher::read(Context *ctx, unique_ptr<Response> in)
{
   auto call_id = in->call_id();
   auto p = requests_.find_and_erase(call_id);
 
+  VLOG(3) << folly::sformat("Read hbase::Response, call_id: {}, hasException: {}, what:
{}",
+                            in->call_id(), bool(in->exception()), in->exception().what());
+
   if (in->exception()) {
     p.setException(in->exception());
   } else {
@@ -51,7 +56,14 @@ folly::Future<unique_ptr<Response>> ClientDispatcher::operator()(unique_ptr<Requ
     LOG(ERROR) << "e = " << call_id;
     this->requests_.erase(call_id);
   });
-  this->pipeline_->write(std::move(arg));
+
+  try {
+    this->pipeline_->write(std::move(arg));
+  } catch (const folly::AsyncSocketException &e) {
+    p.setException(folly::exception_wrapper{ConnectionException{folly::exception_wrapper{e}}});
+    /* clear folly::Promise to avoid overflow. */
+    requests_.erase(call_id);
+  }
 
   return f;
 }
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 39227d3..983a68c 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -21,8 +21,8 @@
 
 #include <folly/ExceptionWrapper.h>
 #include <folly/Likely.h>
+#include <folly/io/async/AsyncSocketException.h>
 #include <glog/logging.h>
-
 #include <string>
 
 #include "connection/request.h"
@@ -95,7 +95,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf>
buf) {
                                              : "";
       std::string stack_trace =
           exceptionResponse.has_stack_trace() ? exceptionResponse.stack_trace() : "";
-      what.append(exception_class_name).append(stack_trace);
+      what.append(stack_trace);
 
       auto remote_exception = std::make_unique<RemoteException>(what);
       remote_exception->set_exception_class_name(exception_class_name)
@@ -133,7 +133,13 @@ folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Re
   // Now store the call id to response.
   resp_msgs_->insert(std::make_pair(r->call_id(), r->resp_msg()));
 
-  // Send the data down the pipeline.
-  return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get()));
+  try {
+    // Send the data down the pipeline.
+    return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get()));
+  } catch (const folly::AsyncSocketException &e) {
+    /* clear protobuf::Message to avoid overflow. */
+    resp_msgs_->erase(r->call_id());
+    throw e;
+  }
 }
 }  // namespace hbase
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index a0c7f96..e763c03 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -22,11 +22,16 @@
 
 #include <chrono>
 
+#include <folly/ExceptionWrapper.h>
+#include <folly/SocketAddress.h>
+#include <folly/io/async/AsyncSocketException.h>
+
 #include "connection/client-dispatcher.h"
 #include "connection/connection-factory.h"
 #include "connection/pipeline.h"
 #include "connection/sasl-handler.h"
 #include "connection/service.h"
+#include "exceptions/exception.h"
 
 using std::chrono::milliseconds;
 using std::chrono::nanoseconds;
@@ -56,15 +61,19 @@ std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>
ConnectionFactory::M
 std::shared_ptr<HBaseService> ConnectionFactory::Connect(
     std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client, const
std::string &hostname,
     uint16_t port) {
-  // Yes this will block however it makes dealing with connection pool soooooo
-  // much nicer.
-  // TODO see about using shared promise for this.
-  auto pipeline = client
-                      ->connect(folly::SocketAddress(hostname, port, true),
-                                std::chrono::duration_cast<milliseconds>(connect_timeout_))
-                      .get();
-  auto dispatcher = std::make_shared<ClientDispatcher>();
-  dispatcher->setPipeline(pipeline);
-  return dispatcher;
+  try {
+    // Yes this will block however it makes dealing with connection pool soooooo
+    // much nicer.
+    // TODO see about using shared promise for this.
+    auto pipeline = client
+                        ->connect(folly::SocketAddress(hostname, port, true),
+                                  std::chrono::duration_cast<milliseconds>(connect_timeout_))
+                        .get();
+    auto dispatcher = std::make_shared<ClientDispatcher>();
+    dispatcher->setPipeline(pipeline);
+    return dispatcher;
+  } catch (const folly::AsyncSocketException &e) {
+    throw ConnectionException(folly::exception_wrapper{e});
+  }
 }
 }  // namespace hbase
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index 10faa7a..a16dca6 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -19,9 +19,12 @@
 
 #include "connection/rpc-client.h"
 
+#include <folly/Format.h>
 #include <folly/Logging.h>
+#include <folly/futures/Future.h>
 #include <unistd.h>
 #include <wangle/concurrent/IOThreadPoolExecutor.h>
+#include "exceptions/exception.h"
 
 using hbase::security::User;
 using std::chrono::nanoseconds;
@@ -55,7 +58,7 @@ folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const
std::string&
                                                               std::unique_ptr<Request>
req,
                                                               std::shared_ptr<User>
ticket) {
   auto remote_id = std::make_shared<ConnectionId>(host, port, ticket);
-  return GetConnection(remote_id)->SendRequest(std::move(req));
+  return SendRequest(remote_id, std::move(req));
 }
 
 folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string&
host,
@@ -64,7 +67,49 @@ folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const
std::string&
                                                               std::shared_ptr<User>
ticket,
                                                               const std::string& service_name)
{
   auto remote_id = std::make_shared<ConnectionId>(host, port, ticket, service_name);
-  return GetConnection(remote_id)->SendRequest(std::move(req));
+  return SendRequest(remote_id, std::move(req));
+}
+
+/**
+ * There are two cases for ConnectionException:
+ * 1. The first time connection
+ * establishment, i.e. GetConnection(remote_id), AsyncSocketException being a cause.
+ * 2. Writing request down the pipeline, i.e. RpcConnection::SendRequest, AsyncSocketException
being
+ * a cause as well.
+ */
+folly::Future<std::unique_ptr<Response>> RpcClient::SendRequest(
+    std::shared_ptr<ConnectionId> remote_id, std::unique_ptr<Request> req) {
+  try {
+    return GetConnection(remote_id)
+        ->SendRequest(std::move(req))
+        .onError([&, this](const folly::exception_wrapper& ew) {
+          VLOG(3) << folly::sformat("RpcClient Exception: {}", ew.what());
+          ew.with_exception([&, this](const hbase::ConnectionException& re) {
+            /* bad connection, remove it from pool. */
+            cp_->Close(remote_id);
+          });
+          return GetFutureWithException(ew);
+        });
+  } catch (const ConnectionException& e) {
+    CHECK(e.cause().get_exception() != nullptr);
+    VLOG(3) << folly::sformat("RpcClient Exception: {}", e.cause().what());
+    /* bad connection, remove it from pool. */
+    cp_->Close(remote_id);
+    return GetFutureWithException(e);
+  }
+}
+
+template <typename EXCEPTION>
+folly::Future<std::unique_ptr<Response>> RpcClient::GetFutureWithException(const
EXCEPTION& e) {
+  return GetFutureWithException(folly::exception_wrapper{e});
+}
+
+folly::Future<std::unique_ptr<Response>> RpcClient::GetFutureWithException(
+    const folly::exception_wrapper& ew) {
+  folly::Promise<std::unique_ptr<Response>> promise;
+  auto future = promise.getFuture();
+  promise.setException(ew);
+  return future;
 }
 
 std::shared_ptr<RpcConnection> RpcClient::GetConnection(std::shared_ptr<ConnectionId>
remote_id) {
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index 0ecde5b..8145be4 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -20,6 +20,7 @@
 
 #include <google/protobuf/service.h>
 
+#include <folly/ExceptionWrapper.h>
 #include <chrono>
 #include <memory>
 #include <string>
@@ -65,6 +66,13 @@ class RpcClient {
 
  private:
   std::shared_ptr<RpcConnection> GetConnection(std::shared_ptr<ConnectionId>
remote_id);
+  folly::Future<std::unique_ptr<Response>> SendRequest(std::shared_ptr<ConnectionId>
remote_id,
+                                                       std::unique_ptr<Request> req);
+  template <typename EXCEPTION>
+  folly::Future<std::unique_ptr<Response>> GetFutureWithException(const EXCEPTION
&e);
+
+  folly::Future<std::unique_ptr<Response>> GetFutureWithException(
+      const folly::exception_wrapper &ew);
 
  private:
   std::shared_ptr<ConnectionPool> cp_;
diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc
index 7f41b7e..8e405ef 100644
--- a/hbase-native-client/connection/rpc-test-server-handler.cc
+++ b/hbase-native-client/connection/rpc-test-server-handler.cc
@@ -72,6 +72,9 @@ std::unique_ptr<Request> RpcTestServerSerializeHandler::CreateReceivedRequest(
   } else if (method_name == "addr") {
     result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
                                        std::make_shared<AddrResponseProto>(), method_name);
+  } else if (method_name == "socketNotOpen") {
+    result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                       std::make_shared<EmptyResponseProto>(), method_name);
   }
   return result;
 }
diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc
index b9e1f13..f350d6a 100644
--- a/hbase-native-client/connection/rpc-test-server.cc
+++ b/hbase-native-client/connection/rpc-test-server.cc
@@ -88,6 +88,9 @@ Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Req
     // TODO:
   } else if (method_name == "addr") {
     // TODO:
+  } else if (method_name == "socketNotOpen") {
+    auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
+    response->set_resp_msg(pb_resp_msg);
   }
 
   return folly::makeFuture<std::unique_ptr<Response>>(std::move(response));
diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc
index 2949fe9..e7f678d 100644
--- a/hbase-native-client/connection/rpc-test.cc
+++ b/hbase-native-client/connection/rpc-test.cc
@@ -20,10 +20,12 @@
 #include <wangle/bootstrap/ClientBootstrap.h>
 #include <wangle/channel/Handler.h>
 
+#include <folly/Format.h>
 #include <folly/Logging.h>
 #include <folly/SocketAddress.h>
 #include <folly/String.h>
 #include <folly/experimental/TestUtil.h>
+#include <folly/io/async/AsyncSocketException.h>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -41,6 +43,9 @@ using namespace folly;
 using namespace hbase;
 
 DEFINE_int32(port, 0, "test server port");
+DEFINE_string(result_format, "RPC {} returned: {}.", "output format of RPC result");
+DEFINE_string(fail_format, "Shouldn't get here, exception is expected for RPC {}.",
+              "output format of enforcing fail");
 typedef ServerBootstrap<RpcTestServerSerializePipeline> ServerTestBootstrap;
 typedef std::shared_ptr<ServerTestBootstrap> ServerPtr;
 
@@ -91,9 +96,10 @@ TEST_F(RpcTest, Echo) {
   auto server_addr = GetRpcServerAddress(server);
   auto client = CreateRpcClient(conf);
 
-  std::string greetings = "hello, hbase server!";
+  auto method = "echo";
+  auto greetings = "hello, hbase server!";
   auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
-                                           std::make_shared<EchoResponseProto>(), "echo");
+                                           std::make_shared<EchoResponseProto>(), method);
   auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
   pb_msg->set_message(greetings);
 
@@ -101,14 +107,14 @@ TEST_F(RpcTest, Echo) {
   client
       ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
                   hbase::security::User::defaultUser())
-      .then([=](std::unique_ptr<Response> response) {
+      .then([&](std::unique_ptr<Response> response) {
         auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg());
         EXPECT_TRUE(pb_resp != nullptr);
-        VLOG(1) << "RPC echo returned: " + pb_resp->message();
+        VLOG(1) << folly::sformat(FLAGS_result_format, method, pb_resp->message());
         EXPECT_EQ(greetings, pb_resp->message());
       })
-      .onError([](const folly::exception_wrapper& ew) {
-        FAIL() << "Shouldn't get here, no exception is expected for RPC echo.";
+      .onError([&](const folly::exception_wrapper& ew) {
+        FAIL() << folly::sformat(FLAGS_fail_format, method);
       });
 
   server->stop();
@@ -118,23 +124,24 @@ TEST_F(RpcTest, Echo) {
 /**
  * test error
  */
-TEST_F(RpcTest, error) {
+TEST_F(RpcTest, Error) {
   auto conf = CreateConf();
   auto server = CreateRpcServer();
   auto server_addr = GetRpcServerAddress(server);
   auto client = CreateRpcClient(conf);
 
+  auto method = "error";
   auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
-                                           std::make_shared<EmptyResponseProto>(),
"error");
+                                           std::make_shared<EmptyResponseProto>(),
method);
   /* sending out request */
   client
       ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
                   hbase::security::User::defaultUser())
-      .then([=](std::unique_ptr<Response> response) {
-        FAIL() << "Shouldn't get here, exception is expected for RPC error.";
+      .then([&](std::unique_ptr<Response> response) {
+        FAIL() << folly::sformat(FLAGS_fail_format, method);
       })
-      .onError([](const folly::exception_wrapper& ew) {
-        VLOG(1) << "RPC error returned with exception.";
+      .onError([&](const folly::exception_wrapper& ew) {
+        VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
         std::string kRemoteException = demangle(typeid(hbase::RemoteException)).toStdString();
         std::string kRpcTestException = demangle(typeid(hbase::RpcTestException)).toStdString();
 
@@ -142,14 +149,57 @@ TEST_F(RpcTest, error) {
         EXPECT_TRUE(bool(ew));
         EXPECT_EQ(kRemoteException, ew.class_name());
 
-        /* verify RemoteException */
-        EXPECT_TRUE(ew.with_exception([&](const hbase::RemoteException& re) {
-          /* verify  DoNotRetryIOException*/
-          EXPECT_EQ(kRpcTestException, re.exception_class_name());
-          EXPECT_EQ(kRpcTestException + ": server error!", re.stack_trace());
+        /* verify exception */
+        EXPECT_TRUE(ew.with_exception([&](const hbase::RemoteException& e) {
+          EXPECT_EQ(kRpcTestException, e.exception_class_name());
+          EXPECT_EQ(kRpcTestException + ": server error!", e.stack_trace());
         }));
       });
 
   server->stop();
   server->join();
 }
+
+TEST_F(RpcTest, SocketNotOpen) {
+  auto conf = CreateConf();
+  auto server = CreateRpcServer();
+  auto server_addr = GetRpcServerAddress(server);
+  auto client = CreateRpcClient(conf);
+
+  auto method = "socketNotOpen";
+  auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                           std::make_shared<EmptyResponseProto>(),
method);
+
+  server->stop();
+  server->join();
+
+  /* sending out request */
+  client
+      ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
+                  hbase::security::User::defaultUser())
+      .then([&](std::unique_ptr<Response> response) {
+        FAIL() << folly::sformat(FLAGS_fail_format, method);
+      })
+      .onError([&](const folly::exception_wrapper& ew) {
+        VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
+        std::string kConnectionException =
+            demangle(typeid(hbase::ConnectionException)).toStdString();
+        std::string kAsyncSocketException =
+            demangle(typeid(folly::AsyncSocketException)).toStdString();
+
+        /* verify exception_wrapper */
+        EXPECT_TRUE(bool(ew));
+        EXPECT_EQ(kConnectionException, ew.class_name());
+
+        /* verify exception */
+        EXPECT_TRUE(ew.with_exception([&](const hbase::ConnectionException& e) {
+          EXPECT_TRUE(bool(e.cause()));
+          EXPECT_EQ(kAsyncSocketException, e.cause().class_name());
+          VLOG(1) << folly::sformat(FLAGS_result_format, method, e.cause().what());
+          e.cause().with_exception([&](const folly::AsyncSocketException& ase) {
+            EXPECT_EQ(AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN, ase.getType());
+            EXPECT_EQ(111 /*ECONNREFUSED*/, ase.getErrno());
+          });
+        }));
+      });
+}
diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h
index bdedff4..bc3b291 100644
--- a/hbase-native-client/exceptions/exception.h
+++ b/hbase-native-client/exceptions/exception.h
@@ -59,7 +59,7 @@ class IOException : public std::logic_error {
   IOException(const std::string& what, bool do_not_retry)
       : logic_error(what), do_not_retry_(do_not_retry) {}
 
-  IOException(const std::string& what, folly::exception_wrapper cause)
+  IOException(const std::string& what, const folly::exception_wrapper& cause)
       : logic_error(what), cause_(cause), do_not_retry_(false) {}
 
   IOException(const std::string& what, folly::exception_wrapper cause, bool do_not_retry)
@@ -67,7 +67,7 @@ class IOException : public std::logic_error {
 
   virtual ~IOException() = default;
 
-  virtual folly::exception_wrapper cause() { return cause_; }
+  virtual folly::exception_wrapper cause() const { return cause_; }
 
   bool do_not_retry() const { return do_not_retry_; }
 
@@ -115,6 +115,18 @@ class RetriesExhaustedException : public IOException {
   int32_t num_retries_;
 };
 
+class ConnectionException : public IOException {
+ public:
+  ConnectionException() {}
+
+  ConnectionException(const std::string& what) : IOException(what) {}
+
+  ConnectionException(const folly::exception_wrapper& cause) : IOException("", cause)
{}
+
+  ConnectionException(const std::string& what, const folly::exception_wrapper& cause)
+      : IOException(what, cause) {}
+};
+
 class RemoteException : public IOException {
  public:
   RemoteException() : IOException(), port_(0) {}
diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto
index 5f91dc4..2730403 100644
--- a/hbase-native-client/if/test_rpc_service.proto
+++ b/hbase-native-client/if/test_rpc_service.proto
@@ -32,4 +32,5 @@ service TestProtobufRpcProto {
   rpc error(EmptyRequestProto) returns (EmptyResponseProto);
   rpc pause(PauseRequestProto) returns (EmptyResponseProto);
   rpc addr(EmptyRequestProto) returns (AddrResponseProto);
+  rpc socketNotOpen(EmptyRequestProto) returns (EmptyResponseProto);
 }


Mime
View raw message