kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject kudu git commit: Extract connection retrying and HA support from HmsCatalog
Date Fri, 05 Oct 2018 21:42:58 GMT
Repository: kudu
Updated Branches:
  refs/heads/master f1f192ca3 -> 42b75278a


Extract connection retrying and HA support from HmsCatalog

Introduces a new abstraction, thrift::HaClient, for providing HA and
retrying support on top of Thrift clients like HmsClient and
SentryClient. The implementation is extracted from HmsCatalog, and is
not changed in any significant ways.

The implementation is inlined into a header because the class requires a
template parameter (to choose between HmsClient or SentryClient), and
forward declaring these instantiations is not possible because the
thrift module does not link to the hms or sentry modules.

No tests are provided for HaClient in isolation, instead it's tested
through existing and new failover tests of HaClient<HmsClient> and
HaClient<SentryClient>.

This patch should have little or no functional changes.

Change-Id: Id8135f4c3995bba0ba28384c35696a1771ff5296
Reviewed-on: http://gerrit.cloudera.org:8080/11570
Reviewed-by: Adar Dembo <adar@cloudera.com>
Reviewed-by: Andrew Wong <awong@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/42b75278
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/42b75278
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/42b75278

Branch: refs/heads/master
Commit: 42b75278a8d226b0235736aae735edeeeb55ab21
Parents: f1f192c
Author: Dan Burkert <danburkert@apache.org>
Authored: Tue Oct 2 14:05:00 2018 -0700
Committer: Dan Burkert <danburkert@apache.org>
Committed: Fri Oct 5 21:42:43 2018 +0000

----------------------------------------------------------------------
 src/kudu/hms/hms_catalog.cc           | 216 ++++--------------------
 src/kudu/hms/hms_catalog.h            |  33 +---
 src/kudu/hms/hms_client.cc            |   2 +
 src/kudu/hms/hms_client.h             |  23 +--
 src/kudu/sentry/sentry_client-test.cc |  29 +++-
 src/kudu/sentry/sentry_client.cc      |   2 +
 src/kudu/sentry/sentry_client.h       |  13 +-
 src/kudu/thrift/client.cc             |  16 +-
 src/kudu/thrift/client.h              | 256 ++++++++++++++++++++++++++++-
 9 files changed, 343 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/42b75278/src/kudu/hms/hms_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
index ae4d998..7620b2e 100644
--- a/src/kudu/hms/hms_catalog.cc
+++ b/src/kudu/hms/hms_catalog.cc
@@ -17,8 +17,6 @@
 
 #include "kudu/hms/hms_catalog.h"
 
-#include <algorithm>
-#include <functional>
 #include <iostream>
 #include <iterator>
 #include <map>
@@ -43,11 +41,10 @@
 #include "kudu/hms/hive_metastore_types.h"
 #include "kudu/hms/hms_client.h"
 #include "kudu/thrift/client.h"
-#include "kudu/util/async_util.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/slice.h"
-#include "kudu/util/threadpool.h"
 
 using boost::none;
 using boost::optional;
@@ -123,12 +120,7 @@ const char* const HmsCatalog::kInvalidTableError = "when the Hive Metastore
inte
     "identifier pair, each containing only ASCII alphanumeric characters, '_', and '/'";
 
 HmsCatalog::HmsCatalog(string master_addresses)
-    : master_addresses_(std::move(master_addresses)),
-      hms_client_(HostPort("", 0), thrift::ClientOptions()),
-      reconnect_after_(MonoTime::Now()),
-      reconnect_failure_(Status::OK()),
-      consecutive_reconnect_failures_(0),
-      reconnect_idx_(0) {
+    : master_addresses_(std::move(master_addresses)) {
 }
 
 HmsCatalog::~HmsCatalog() {
@@ -136,35 +128,32 @@ HmsCatalog::~HmsCatalog() {
 }
 
 Status HmsCatalog::Start() {
-  if (threadpool_) {
-    return Status::IllegalState("HMS Catalog is already started");
-  }
-
-  RETURN_NOT_OK(ParseUris(FLAGS_hive_metastore_uris, &hms_addresses_));
+  vector<HostPort> addresses;
+  RETURN_NOT_OK(ParseUris(FLAGS_hive_metastore_uris, &addresses));
 
-  // The thread pool must be capped at one thread to ensure serialized access to
-  // the fields of HmsCatalog.
-  RETURN_NOT_OK(ThreadPoolBuilder("hms-catalog")
-      .set_min_threads(1)
-      .set_max_threads(1)
-      .Build(&threadpool_));
+  thrift::ClientOptions options;
+  options.send_timeout = MonoDelta::FromSeconds(FLAGS_hive_metastore_send_timeout);
+  options.recv_timeout = MonoDelta::FromSeconds(FLAGS_hive_metastore_recv_timeout);
+  options.conn_timeout = MonoDelta::FromSeconds(FLAGS_hive_metastore_conn_timeout);
+  options.enable_kerberos = FLAGS_hive_metastore_sasl_enabled;
+  options.service_principal = FLAGS_hive_metastore_kerberos_principal;
+  options.max_buf_size = FLAGS_hive_metastore_max_message_size;
+  options.retry_count = FLAGS_hive_metastore_retry_count;
 
-  return Status::OK();
+  return ha_client_.Start(std::move(addresses), std::move(options));
 }
 
 void HmsCatalog::Stop() {
-  if (threadpool_) {
-    threadpool_->Shutdown();
-  }
+  ha_client_.Stop();
 }
 
 Status HmsCatalog::CreateTable(const string& id,
                                const string& name,
                                optional<const string&> owner,
                                const Schema& schema) {
-  return Execute([&] (HmsClient* client) {
-      hive::Table table;
-      RETURN_NOT_OK(PopulateTable(id, name, owner, schema, master_addresses_, &table));
+  hive::Table table;
+  RETURN_NOT_OK(PopulateTable(id, name, owner, schema, master_addresses_, &table));
+  return ha_client_.Execute([&] (HmsClient* client) {
       return client->CreateTable(table, EnvironmentContext());
   });
 }
@@ -183,7 +172,7 @@ Status HmsCatalog::DropTable(const string& name, const hive::EnvironmentContext&
   Slice hms_database;
   Slice hms_table;
   RETURN_NOT_OK(ParseTableName(name, &hms_database, &hms_table));
-  return Execute([&] (HmsClient* client) {
+  return ha_client_.Execute([&] (HmsClient* client) {
     return client->DropTable(hms_database.ToString(), hms_table.ToString(), env_ctx);
   });
 }
@@ -192,7 +181,7 @@ Status HmsCatalog::UpgradeLegacyImpalaTable(const string& id,
                                             const string& db_name,
                                             const string& tb_name,
                                             const Schema& schema) {
-  return Execute([&] (HmsClient* client) {
+  return ha_client_.Execute([&] (HmsClient* client) {
     hive::Table table;
     RETURN_NOT_OK(client->GetTable(db_name, tb_name, &table));
     if (table.parameters[HmsClient::kStorageHandlerKey] !=
@@ -207,11 +196,11 @@ Status HmsCatalog::UpgradeLegacyImpalaTable(const string& id,
 }
 
 Status HmsCatalog::DowngradeToLegacyImpalaTable(const string& name) {
-  return Execute([&] (HmsClient* client) {
-    Slice hms_database;
-    Slice hms_table;
-    RETURN_NOT_OK(ParseTableName(name, &hms_database, &hms_table));
+  Slice hms_database;
+  Slice hms_table;
+  RETURN_NOT_OK(ParseTableName(name, &hms_database, &hms_table));
 
+  return ha_client_.Execute([&] (HmsClient* client) {
     hive::Table table;
     RETURN_NOT_OK(client->GetTable(hms_database.ToString(), hms_table.ToString(), &table));
     if (table.parameters[HmsClient::kStorageHandlerKey] !=
@@ -235,7 +224,7 @@ Status HmsCatalog::DowngradeToLegacyImpalaTable(const string& name)
{
 }
 
 Status HmsCatalog::GetKuduTables(vector<hive::Table>* kudu_tables) {
-  return Execute([&] (HmsClient* client) {
+  return ha_client_.Execute([&] (HmsClient* client) {
     vector<string> database_names;
     RETURN_NOT_OK(client->GetAllDatabases(&database_names));
     vector<string> table_names;
@@ -267,8 +256,11 @@ Status HmsCatalog::AlterTable(const string& id,
                               const string& name,
                               const string& new_name,
                               const Schema& schema) {
+  Slice hms_database;
+  Slice hms_table;
+  RETURN_NOT_OK(ParseTableName(name, &hms_database, &hms_table));
 
-  return Execute([&] (HmsClient* client) {
+  return ha_client_.Execute([&] (HmsClient* client) {
       // The HMS does not have a way to alter individual fields of a table
       // entry, so we must request the existing table entry from the HMS, update
       // the fields, and write it back. Otherwise we'd overwrite metadata fields
@@ -284,10 +276,6 @@ Status HmsCatalog::AlterTable(const string& id,
       // - The original table does not exist in the HMS
       // - The original table doesn't match the Kudu table being altered
 
-      Slice hms_database;
-      Slice hms_table;
-      RETURN_NOT_OK(ParseTableName(name, &hms_database, &hms_table));
-
       hive::Table table;
       RETURN_NOT_OK(client->GetTable(hms_database.ToString(), hms_table.ToString(), &table));
 
@@ -308,157 +296,11 @@ Status HmsCatalog::AlterTable(const string& id,
 
 Status HmsCatalog::GetNotificationEvents(int64_t last_event_id, int max_events,
                                          vector<hive::NotificationEvent>* events) {
-  return Execute([&] (HmsClient* client) {
+  return ha_client_.Execute([&] (HmsClient* client) {
     return client->GetNotificationEvents(last_event_id, max_events, events);
   });
 }
 
-template<typename Task>
-Status HmsCatalog::Execute(Task task) {
-  Synchronizer synchronizer;
-  auto callback = synchronizer.AsStdStatusCallback();
-
-  // TODO(todd): wrapping this in a TRACE_EVENT scope and a LOG_IF_SLOW and such
-  // would be helpful. Perhaps a TRACE message and/or a TRACE_COUNTER_INCREMENT
-  // too to keep track of how much time is spent in calls to HMS for a given
-  // CreateTable call. That will also require propagating the current Trace
-  // object into the 'Rpc' object. Note that the HmsClient class already has
-  // LOG_IF_SLOW calls internally.
-
-  RETURN_NOT_OK(threadpool_->SubmitFunc([=] {
-    // The main run routine of the threadpool thread. Runs the task with
-    // exclusive access to the HMS client. If the task fails, it will be
-    // retried, unless the failure type is non-retriable or the maximum number
-    // of retries has been exceeded. Also handles re-connecting the HMS client
-    // after a fatal error.
-    //
-    // Since every task submitted to the (single thread) pool runs this, it's
-    // essentially a single iteration of a run loop which handles HMS client
-    // reconnection and task processing.
-    //
-    // Notes on error handling:
-    //
-    // There are three separate error scenarios below:
-    //
-    // * Error while (re)connecting the HMS client - This is considered a
-    // 'non-recoverable' error. The current task is immediately failed. In order
-    // to avoid hot-looping and hammering the HMS with reconnect attempts on
-    // every queued task, we set a backoff period. Any tasks which subsequently
-    // run during this backoff period are also immediately failed.
-    //
-    // * Task results in a fatal error - a fatal error is any error caused by a
-    // network or IO fault (not an application level failure). The HMS client
-    // will attempt to reconnect, and the task will be retried (up to a limit).
-    //
-    // * Task results in a non-fatal error - a non-fatal error is an application
-    // level error, and causes the task to be failed immediately (no retries).
-
-    // Keep track of the first attempt's failure. Typically the first failure is
-    // the most informative.
-    Status first_failure;
-
-    for (int attempt = 0; attempt <= FLAGS_hive_metastore_retry_count; attempt++) {
-      if (!hms_client_.IsConnected()) {
-        if (reconnect_after_ > MonoTime::Now()) {
-          // Not yet ready to attempt reconnection; fail the task immediately.
-          DCHECK(!reconnect_failure_.ok());
-          return callback(reconnect_failure_);
-        }
-
-        // Attempt to reconnect.
-        Status reconnect_status = Reconnect();
-        if (!reconnect_status.ok()) {
-          // Reconnect failed; retry with exponential backoff capped at 10s and
-          // fail the task. We don't bother with jitter here because only the
-          // leader master should be attempting this in any given period per
-          // cluster.
-          consecutive_reconnect_failures_++;
-          reconnect_after_ = MonoTime::Now() +
-              std::min(MonoDelta::FromMilliseconds(100 << consecutive_reconnect_failures_),
-                       MonoDelta::FromSeconds(10));
-          reconnect_failure_ = std::move(reconnect_status);
-          return callback(reconnect_failure_);
-        }
-
-        consecutive_reconnect_failures_ = 0;
-      }
-
-      // Execute the task.
-      Status task_status = task(&hms_client_);
-
-      // If the task succeeds, or it's a non-retriable error, return the result.
-      if (task_status.ok() || !IsFatalError(task_status)) {
-        return callback(task_status);
-      }
-
-      // A fatal error occurred. Tear down the connection, and try again. We
-      // don't log loudly here because odds are the reconnection will fail if
-      // it's a true fault, at which point we do log loudly.
-      VLOG(1) << "Call to HMS failed: " << task_status.ToString();
-
-      if (attempt == 0) {
-        first_failure = std::move(task_status);
-      }
-
-      WARN_NOT_OK(hms_client_.Stop(), "Failed to stop Hive Metastore client");
-    }
-
-    // We've exhausted the allowed retries.
-    DCHECK(!first_failure.ok());
-    LOG(WARNING) << "Call to HMS failed after " << FLAGS_hive_metastore_retry_count
-                 << " retries: " << first_failure.ToString();
-
-    return callback(first_failure);
-  }));
-
-  return synchronizer.Wait();
-}
-
-Status HmsCatalog::Reconnect() {
-  Status s;
-
-  thrift::ClientOptions options;
-  options.send_timeout = MonoDelta::FromSeconds(FLAGS_hive_metastore_send_timeout);
-  options.recv_timeout = MonoDelta::FromSeconds(FLAGS_hive_metastore_recv_timeout);
-  options.conn_timeout = MonoDelta::FromSeconds(FLAGS_hive_metastore_conn_timeout);
-  options.enable_kerberos = FLAGS_hive_metastore_sasl_enabled;
-  options.service_principal = FLAGS_hive_metastore_kerberos_principal;
-  options.max_buf_size = FLAGS_hive_metastore_max_message_size;
-
-  // Try reconnecting to each HMS in sequence, returning the first one which
-  // succeeds. In order to avoid getting 'stuck' on a partially failed HMS, we
-  // remember which we connected to previously and try it last.
-  for (int i = 0; i < hms_addresses_.size(); i++) {
-    const auto& address = hms_addresses_[reconnect_idx_];
-    reconnect_idx_ = (reconnect_idx_ + 1) % hms_addresses_.size();
-
-    hms_client_ = HmsClient(address, options);
-    s = hms_client_.Start();
-    if (s.ok()) {
-      VLOG(1) << "Connected to Hive Metastore " << address.ToString();
-      return Status::OK();
-    }
-
-    WARN_NOT_OK(s, Substitute("Failed to connect to Hive Metastore ($0)", address.ToString()))
-  }
-
-  WARN_NOT_OK(hms_client_.Stop(), "Failed to stop Hive Metastore client");
-  return s;
-}
-
-bool HmsCatalog::IsFatalError(const Status& status) {
-  // Whitelist of errors which are not fatal. This errs on the side of
-  // considering an error fatal since the consequences are low; just an
-  // unnecessary reconnect. If a fatal error is not recognized it could cause
-  // another RPC to fail, since there is no way to check the status of the
-  // connection before sending an RPC.
-  return !(status.IsAlreadyPresent()
-        || status.IsNotFound()
-        || status.IsInvalidArgument()
-        || status.IsIllegalState()
-        || status.IsRemoteError());
-}
-
 namespace {
 
 string column_to_field_type(const ColumnSchema& column) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/42b75278/src/kudu/hms/hms_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_catalog.h b/src/kudu/hms/hms_catalog.h
index a13f885..6139e28 100644
--- a/src/kudu/hms/hms_catalog.h
+++ b/src/kudu/hms/hms_catalog.h
@@ -24,19 +24,17 @@
 #include <boost/optional/optional.hpp>
 #include <gtest/gtest_prod.h>
 
-#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
 #include "kudu/hms/hive_metastore_types.h"
 #include "kudu/hms/hms_client.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/net/net_util.h"
+#include "kudu/thrift/client.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
+class HostPort;
 class Schema;
 class Slice;
-class ThreadPool;
 
 namespace hms {
 
@@ -170,22 +168,10 @@ class HmsCatalog {
   FRIEND_TEST(HmsCatalogStaticTest, TestParseTableNameSlices);
   FRIEND_TEST(HmsCatalogStaticTest, TestParseUris);
 
-  // Synchronously executes a task with exclusive access to the HMS client.
-  template<typename Task>
-  Status Execute(Task task) WARN_UNUSED_RESULT;
-
-  // Reconnects hms_client_ to an HMS, or returns an error if all HMS instances
-  // are unavailable.
-  Status Reconnect();
-
   // Drops a table entry from the HMS, supplying the provided environment context.
   Status DropTable(const std::string& name,
                    const hive::EnvironmentContext& env_ctx) WARN_UNUSED_RESULT;
 
-  // Returns true if the RPC status is 'fatal', e.g. the Thrift connection on
-  // which it occurred should be shut down.
-  static bool IsFatalError(const Status& status);
-
   // Parses a Kudu table name into a Hive database and table name.
   // Returns an error if the Kudu table name is not correctly formatted.
   // The returned HMS database and table slices must not outlive 'table_name'.
@@ -202,20 +188,7 @@ class HmsCatalog {
   // Kudu master addresses.
   const std::string master_addresses_;
 
-  // Initialized during Start().
-  std::vector<HostPort> hms_addresses_;
-  gscoped_ptr<ThreadPool> threadpool_;
-
-  // Fields only used by the threadpool thread:
-
-  // The HMS client.
-  hms::HmsClient hms_client_;
-
-  // Fields which track consecutive reconnection attempts and backoff.
-  MonoTime reconnect_after_;
-  Status reconnect_failure_;
-  int consecutive_reconnect_failures_;
-  int reconnect_idx_;
+  thrift::HaClient<hms::HmsClient> ha_client_;
 };
 
 } // namespace hms

http://git-wip-us.apache.org/repos/asf/kudu/blob/42b75278/src/kudu/hms/hms_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client.cc b/src/kudu/hms/hms_client.cc
index 9d51956..3482163 100644
--- a/src/kudu/hms/hms_client.cc
+++ b/src/kudu/hms/hms_client.cc
@@ -122,6 +122,8 @@ const uint16_t HmsClient::kDefaultHmsPort = 9083;
 
 const int kSlowExecutionWarningThresholdMs = 1000;
 
+const char* const HmsClient::kServiceName = "Hive Metastore";
+
 HmsClient::HmsClient(const HostPort& address, const ClientOptions& options)
       : client_(hive::ThriftHiveMetastoreClient(CreateClientProtocol(address, options)))
{
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/42b75278/src/kudu/hms/hms_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client.h b/src/kudu/hms/hms_client.h
index 23451f3..da8cf7b 100644
--- a/src/kudu/hms/hms_client.h
+++ b/src/kudu/hms/hms_client.h
@@ -33,7 +33,7 @@ class HostPort;
 
 namespace thrift {
 struct ClientOptions;
-}
+} // namespace thrift
 
 namespace hms {
 
@@ -50,21 +50,9 @@ enum class Cascade {
 // HmsClient is not thread safe.
 //
 // HmsClient wraps a single TCP connection to an HMS, and does not attempt to
-// handle or retry on failure. It's expected that a higher-level component will
-// wrap HmsClient to provide retry, pooling, and HA deployment features if
-// necessary.
-//
-// Note: Thrift provides a handy TSocketPool class which could be useful in
-// allowing the HmsClient to transparently handle connecting to a pool of HA HMS
-// instances. However, because TSocketPool handles choosing the instance during
-// socket connect, it can't determine if the remote endpoint is actually an HMS,
-// or just a random listening TCP socket. Nor can it do application-level checks
-// like ensuring that the connected HMS is configured with the Kudu Metastore
-// plugin. So, it's better for a higher-level construct to handle connecting to
-// HA deployments by wrapping multiple HmsClient instances. HmsClient punts on
-// handling connection retries, because the higher-level construct which is
-// handling HA deployments will naturally want to retry across HMS instances as
-// opposed to retrying repeatedly on a single instance.
+// retry on failure. If higher-level features like thread-safety, retrying, and
+// HA support are needed then use thrift::HaClient<HmsClient> to wrap the HMS
+// client.
 class HmsClient {
  public:
 
@@ -91,6 +79,8 @@ class HmsClient {
 
   static const uint16_t kDefaultHmsPort;
 
+  static const char* const kServiceName;
+
   // Create an HmsClient connection to the provided HMS Thrift RPC address.
   HmsClient(const HostPort& address, const thrift::ClientOptions& options);
   ~HmsClient();
@@ -201,6 +191,5 @@ class HmsClient {
  private:
   hive::ThriftHiveMetastoreClient client_;
 };
-
 } // namespace hms
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/42b75278/src/kudu/sentry/sentry_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/sentry/sentry_client-test.cc b/src/kudu/sentry/sentry_client-test.cc
index d738a4b..519e277 100644
--- a/src/kudu/sentry/sentry_client-test.cc
+++ b/src/kudu/sentry/sentry_client-test.cc
@@ -19,6 +19,7 @@
 
 #include <map>
 #include <string>
+#include <vector>
 
 #include <gtest/gtest.h>
 
@@ -27,11 +28,13 @@
 #include "kudu/sentry/mini_sentry.h"
 #include "kudu/sentry/sentry_policy_service_types.h"
 #include "kudu/thrift/client.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
 using std::string;
+using std::vector;
 
 namespace kudu {
 namespace sentry {
@@ -45,15 +48,39 @@ class SentryClientTest : public KuduTest,
 };
 INSTANTIATE_TEST_CASE_P(KerberosEnabled, SentryClientTest, ::testing::Bool());
 
-TEST_P(SentryClientTest, TestMiniSentryLifecycle) {
+TEST_F(SentryClientTest, TestMiniSentryLifecycle) {
   MiniSentry mini_sentry;
   ASSERT_OK(mini_sentry.Start());
 
+  // Create an HA Sentry client and ensure it automatically reconnects after service interruption.
+  thrift::HaClient<SentryClient> client;
+
+  ASSERT_OK(client.Start(vector<HostPort>({ mini_sentry.address() }), thrift::ClientOptions()));
+
+  auto smoketest = [&] () -> Status {
+    return client.Execute([] (SentryClient* client) -> Status {
+        ::sentry::TCreateSentryRoleRequest create_req;
+        create_req.requestorUserName = "test-admin";
+        create_req.roleName = "test-role";
+        RETURN_NOT_OK(client->CreateRole(create_req));
+
+        ::sentry::TDropSentryRoleRequest drop_req;
+        drop_req.requestorUserName = "test-admin";
+        drop_req.roleName = "test-role";
+        RETURN_NOT_OK(client->DropRole(drop_req));
+        return Status::OK();
+    });
+  };
+
+  ASSERT_OK(smoketest());
+
   ASSERT_OK(mini_sentry.Stop());
   ASSERT_OK(mini_sentry.Start());
+  ASSERT_OK(smoketest());
 
   ASSERT_OK(mini_sentry.Pause());
   ASSERT_OK(mini_sentry.Resume());
+  ASSERT_OK(smoketest());
 }
 
 // Basic functionality test of the Sentry client. The goal is not an exhaustive

http://git-wip-us.apache.org/repos/asf/kudu/blob/42b75278/src/kudu/sentry/sentry_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/sentry/sentry_client.cc b/src/kudu/sentry/sentry_client.cc
index 530bd2e..a9d08db 100644
--- a/src/kudu/sentry/sentry_client.cc
+++ b/src/kudu/sentry/sentry_client.cc
@@ -51,6 +51,8 @@ namespace sentry {
 
 const uint16_t SentryClient::kDefaultSentryPort = 8038;
 
+const char* const SentryClient::kServiceName = "Sentry";
+
 const int kSlowExecutionWarningThresholdMs = 1000;
 
 namespace {

http://git-wip-us.apache.org/repos/asf/kudu/blob/42b75278/src/kudu/sentry/sentry_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/sentry/sentry_client.h b/src/kudu/sentry/sentry_client.h
index e5d9226..2956476 100644
--- a/src/kudu/sentry/sentry_client.h
+++ b/src/kudu/sentry/sentry_client.h
@@ -44,18 +44,17 @@ namespace sentry {
 //
 // SentryClient is not thread safe.
 //
-// SentryClient wraps a single TCP connection to a Sentry service instance, and
-// does not attempt to handle or retry on failure. It's expected that a
-// higher-level component will wrap SentryClient to provide retry, pooling, and
-// HA deployment features if necessary.
-//
-// Note: see HmsClient for why TSocketPool is not used for transparently
-// handling connections to Sentry HA instances.
+// SentryClient wraps a single TCP connection to an HMS, and does not attempt to
+// retry on failure. If higher-level features like thread-safety, retrying, and
+// HA support are needed then use thrift::HaClient<SentryClient> to wrap the HMS
+// client.
 class SentryClient {
  public:
 
   static const uint16_t kDefaultSentryPort;
 
+  static const char* const kServiceName;
+
   // Create a SentryClient connection to the provided Sentry service Thrift RPC address.
   SentryClient(const HostPort& address, const thrift::ClientOptions& options);
   ~SentryClient();

http://git-wip-us.apache.org/repos/asf/kudu/blob/42b75278/src/kudu/thrift/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/thrift/client.cc b/src/kudu/thrift/client.cc
index 9f4c29c..bce384e 100644
--- a/src/kudu/thrift/client.cc
+++ b/src/kudu/thrift/client.cc
@@ -20,7 +20,6 @@
 #include <memory>
 #include <mutex>
 #include <ostream>
-#include <utility>
 
 #include <glog/logging.h>
 #include <thrift/TOutput.h>
@@ -30,6 +29,7 @@
 
 #include "kudu/thrift/sasl_client_transport.h"
 #include "kudu/util/net/net_util.h"
+#include "kudu/util/status.h"
 
 namespace apache {
 namespace thrift {
@@ -44,8 +44,8 @@ using apache::thrift::protocol::TProtocol;
 using apache::thrift::transport::TBufferedTransport;
 using apache::thrift::transport::TSocket;
 using apache::thrift::transport::TTransport;
-using std::shared_ptr;
 using std::make_shared;
+using std::shared_ptr;
 
 namespace kudu {
 namespace thrift {
@@ -83,5 +83,17 @@ shared_ptr<TProtocol> CreateClientProtocol(const HostPort& address,
const Client
   return make_shared<TBinaryProtocol>(std::move(transport));
 }
 
+bool IsFatalError(const Status& error) {
+  // Whitelist of errors which are not fatal. This errs on the side of
+  // considering an error fatal since the consequences are low; just an
+  // unnecessary reconnect. If a fatal error is not recognized it could cause
+  // another RPC to fail, since there is no way to check the status of the
+  // connection before sending an RPC.
+  return !(error.IsAlreadyPresent()
+        || error.IsNotFound()
+        || error.IsInvalidArgument()
+        || error.IsIllegalState()
+        || error.IsRemoteError());
+}
 } // namespace thrift
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/42b75278/src/kudu/thrift/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/thrift/client.h b/src/kudu/thrift/client.h
index b1e21aa..b6ac791 100644
--- a/src/kudu/thrift/client.h
+++ b/src/kudu/thrift/client.h
@@ -19,11 +19,23 @@
 
 #pragma once
 
+#include <algorithm>
 #include <cstdint>
+#include <functional>
 #include <memory>
 #include <string>
+#include <vector>
 
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/async_util.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
 
 namespace apache {
 namespace thrift {
@@ -34,9 +46,6 @@ class TProtocol;
 } // namespace apache
 
 namespace kudu {
-
-class HostPort;
-
 namespace thrift {
 
 // Options for a Thrift client connection.
@@ -62,10 +71,251 @@ struct ClientOptions {
   // Maximum size of objects which can be received on the Thrift connection.
   // Defaults to 100MiB to match Thrift TSaslTransport.receiveSaslMessage.
   int32_t max_buf_size = 100 * 1024 * 1024;
+
+  // Number of times an RPC is retried by the HA client after encountering
+  // retriable failures, such as network failures.
+  int32_t retry_count = 1;
 };
 
 std::shared_ptr<apache::thrift::protocol::TProtocol> CreateClientProtocol(
     const HostPort& address, const ClientOptions& options);
 
+// Returns 'true' if the error should result in the Thrift client being torn down.
+bool IsFatalError(const Status& error);
+
+// A wrapper class around a Thrift service client which provides support for HA
+// service configurations, retrying, backoff, and fault-tolerance.
+//
+// This client manages the lifecycle of the underlying Thrift clients,
+// automatically reconnecting on faults and retrying requests.
+//
+// This class is thread safe after Start() is called.
+template<typename Service>
+class HaClient {
+ public:
+
+  HaClient();
+  ~HaClient();
+
+  // Starts the highly available Thrift service client instance.
+  Status Start(std::vector<HostPort> addresses, ClientOptions options);
+
+  // Stops the highly available Thrift service client instance.
+  void Stop();
+
+  // Synchronously executes a task with exclusive access to the thrift service client.
+  Status Execute(std::function<Status(Service*)> task) WARN_UNUSED_RESULT;
+
+ private:
+
+  // Reconnects to an instance of the Thrift service, or returns an error if all
+  // service instances are unavailable.
+  Status Reconnect();
+
+  // Background thread which executes calls to the Thrift service.
+  gscoped_ptr<ThreadPool> threadpool_;
+
+  // Client options.
+  std::vector<HostPort> addresses_;
+  ClientOptions options_;
+
+  // The actual client service instance (HmsClient or SentryClient).
+  Service service_client_;
+
+  // Fields which track consecutive reconnection attempts and backoff.
+  MonoTime reconnect_after_;
+  Status reconnect_failure_;
+  int consecutive_reconnect_failures_;
+  int reconnect_idx_;
+};
+
+///////////////////////////////////////////////////////////////////////////////
+// HaClient class definitions
+//
+// HaClient is defined inline so that it can be instantiated with HmsClient and
+// SentryClient as template parameters, which live in modules which are not
+// linked to the thrift module.
+///////////////////////////////////////////////////////////////////////////////
+
+template<typename Service>
+HaClient<Service>::HaClient()
+    : service_client_(HostPort("", 0), options_),
+      reconnect_after_(MonoTime::Now()),
+      reconnect_failure_(Status::OK()),
+      consecutive_reconnect_failures_(0),
+      reconnect_idx_(0) {
+}
+
+template<typename Service>
+HaClient<Service>::~HaClient() {
+  Stop();
+}
+
+template<typename Service>
+Status HaClient<Service>::Start(std::vector<HostPort> addresses, ClientOptions
options) {
+  if (threadpool_) {
+    return Status::IllegalState(strings::Substitute(
+          "$0 HA client is already started", Service::kServiceName));
+  }
+
+  addresses_ = std::move(addresses);
+  options_ = std::move(options);
+
+  // The thread pool must be capped at one thread to ensure serialized access to
+  // the fields of the service client (which isn't thread safe).
+  RETURN_NOT_OK(ThreadPoolBuilder(Service::kServiceName)
+      .set_min_threads(1)
+      .set_max_threads(1)
+      .Build(&threadpool_));
+
+  return Status::OK();
+}
+
+template<typename Service>
+void HaClient<Service>::Stop() {
+  if (threadpool_) {
+    threadpool_->Shutdown();
+  }
+}
+
+template<typename Service>
+Status HaClient<Service>::Execute(std::function<Status(Service*)> task) {
+  Synchronizer synchronizer;
+  auto callback = synchronizer.AsStdStatusCallback();
+
+  // TODO(todd): wrapping this in a TRACE_EVENT scope and a LOG_IF_SLOW and such
+  // would be helpful. Perhaps a TRACE message and/or a TRACE_COUNTER_INCREMENT
+  // too to keep track of how much time is spent in calls to the Thrift client.
+  // That will also require propagating the current Trace object into the 'Rpc'
+  // object. Note that the Thrift client classes already have LOG_IF_SLOW calls
+  // internally.
+
+  RETURN_NOT_OK(threadpool_->SubmitFunc([=] {
+    // The main run routine of the threadpool thread. Runs the task with
+    // exclusive access to the Thrift service client. If the task fails, it will
+    // be retried, unless the failure type is non-retriable or the maximum
+    // number of retries has been exceeded. Also handles re-connecting the
+    // Thrift service client after a fatal error.
+    //
+    // Since every task submitted to the (single thread) pool runs this, it's
+    // essentially a single iteration of a run loop which handles service client
+    // reconnection and task processing.
+    //
+    // Notes on error handling:
+    //
+    // There are three separate error scenarios below:
+    //
+    // * Error while (re)connecting the client - This is considered a
+    // 'non-recoverable' error. The current task is immediately failed. In order
+    // to avoid hot-looping and hammering the service with reconnect attempts on
+    // every queued task, we set a backoff period. Any tasks which subsequently
+    // run during this backoff period are also immediately failed.
+    //
+    // * Task results in a fatal error - a fatal error is any error caused by a
+    // network or IO fault (not an application level failure). The HA client
+    // will attempt to reconnect, and the task will be retried (up to a limit).
+    //
+    // * Task results in a non-fatal error - a non-fatal error is an application
+    // level error, and causes the task to be failed immediately (no retries).
+
+    // Keep track of the first attempt's failure. Typically the first failure is
+    // the most informative.
+    Status first_failure;
+
+    for (int attempt = 0; attempt <= options_.retry_count; attempt++) {
+      if (!service_client_.IsConnected()) {
+        if (reconnect_after_ > MonoTime::Now()) {
+          // Not yet ready to attempt reconnection; fail the task immediately.
+          DCHECK(!reconnect_failure_.ok());
+          return callback(reconnect_failure_);
+        }
+
+        // Attempt to reconnect.
+        Status reconnect_status = Reconnect();
+        if (!reconnect_status.ok()) {
+          // Reconnect failed; retry with exponential backoff capped at 10s and
+          // fail the task. We don't bother with jitter here because only the
+          // leader master should be attempting this in any given period per
+          // cluster.
+          consecutive_reconnect_failures_++;
+          reconnect_after_ = MonoTime::Now() +
+              std::min(MonoDelta::FromMilliseconds(100 << consecutive_reconnect_failures_),
+                       MonoDelta::FromSeconds(10));
+          reconnect_failure_ = std::move(reconnect_status);
+          return callback(reconnect_failure_);
+        }
+
+        consecutive_reconnect_failures_ = 0;
+      }
+
+      // Execute the task.
+      Status task_status = task(&service_client_);
+
+      // If the task succeeds, or it's a non-retriable error, return the result.
+      if (task_status.ok() || !IsFatalError(task_status)) {
+        return callback(task_status);
+      }
+
+      // A fatal error occurred. Tear down the connection, and try again. We
+      // don't log loudly here because odds are the reconnection will fail if
+      // it's a true fault, at which point we do log loudly.
+      VLOG(1) << strings::Substitute(
+          "Call to $0 failed: $1", Service::kServiceName, task_status.ToString());
+
+      if (attempt == 0) {
+        first_failure = std::move(task_status);
+      }
+
+      WARN_NOT_OK(service_client_.Stop(),
+                  strings::Substitute("Failed to stop $0 client", Service::kServiceName));
+    }
+
+    // We've exhausted the allowed retries.
+    DCHECK(!first_failure.ok());
+    LOG(WARNING) << strings::Substitute(
+        "Call to $0 failed after $1 retries: $2",
+        Service::kServiceName, options_.retry_count, first_failure.ToString());
+
+    return callback(first_failure);
+  }));
+
+  return synchronizer.Wait();
+}
+
+// Note: Thrift provides a handy TSocketPool class which could be useful in
+// allowing the Thrift client to transparently handle connecting to a pool of HA
+// service instances. However, because TSocketPool handles choosing the instance
+// during socket connect, it can't determine if the remote endpoint is actually
+// a Thrift service, or just a random listening TCP socket. Nor can it do
+// application-level checks like ensuring that the connected service is
+// configured correctly. So, it's better to handle reconnecting and failover in
+// this higher-level construct.
+template<typename Service>
+Status HaClient<Service>::Reconnect() {
+  Status s;
+
+  // Try reconnecting to each service instance in sequence, returning the first
+  // one which succeeds. In order to avoid getting 'stuck' on a partially failed
+  // instance, we remember which we connected to previously and try it last.
+  for (int i = 0; i < addresses_.size(); i++) {
+    const auto& address = addresses_[reconnect_idx_];
+    reconnect_idx_ = (reconnect_idx_ + 1) % addresses_.size();
+
+    service_client_ = Service(address, options_);
+    s = service_client_.Start();
+    if (s.ok()) {
+      VLOG(1) << strings::Substitute(
+          "Connected to $0 $1", Service::kServiceName, address.ToString());
+      return Status::OK();
+    }
+
+    WARN_NOT_OK(s, strings::Substitute("Failed to connect to $0 ($1)",
+          Service::kServiceName, address.ToString()))
+  }
+
+  WARN_NOT_OK(service_client_.Stop(),
+              strings::Substitute("Failed to stop $0 client", Service::kServiceName));
+  return s;
+}
 } // namespace thrift
 } // namespace kudu


Mime
View raw message