kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject kudu git commit: KUDU-2191 (4/n): HMS Thrift client fault handling
Date Wed, 31 Jan 2018 21:20:35 GMT
Repository: kudu
Updated Branches:
  refs/heads/master dd57be8c1 -> b315d0ed0


KUDU-2191 (4/n): HMS Thrift client fault handling

This commit improves the new HMS Thrift client's ability to handle
faults. In particular:

- The client now uses send, receive, and connect timeouts so that a
  non-responsive HMS instance will not block the client indefinitely.
- The Thrift logging callback is hooked up to glog so that we get proper
  log messages from Thrift.
- The HmsClient class and method docs include information about behavior
  when errors are encountered.

In the part 2 review, Todd also brought up the prospect of creating a
wrapper Thrift socket or transport to inject slow log warning messages
automatically. I've held off doing this for now, because I haven't been
able to figure out a way to do that which can associate the slowness
with higher-level operations like 'create database', as opposed to
lower-level like 'socket write'. I've made sure to apply the slow
warning calls uniformly across the HmsClient methods, and I don't think
it will be too onerous to keep them consistent in the future.

Change-Id: Ic48bbb833bbae39b383ae239054b9710da3c746d
Reviewed-on: http://gerrit.cloudera.org:8080/8494
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b315d0ed
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b315d0ed
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b315d0ed

Branch: refs/heads/master
Commit: b315d0ed078f4ca8d26ece082c0bf90f23fe63c1
Parents: dd57be8
Author: Dan Burkert <danburkert@apache.org>
Authored: Wed Nov 1 10:42:13 2017 -0400
Committer: Todd Lipcon <todd@apache.org>
Committed: Wed Jan 31 21:02:51 2018 +0000

----------------------------------------------------------------------
 src/kudu/hms/hms_client-test.cc                 | 84 +++++++++++++++++++-
 src/kudu/hms/hms_client.cc                      | 39 +++++++--
 src/kudu/hms/hms_client.h                       | 44 ++++++++--
 src/kudu/hms/mini_hms.cc                        | 33 ++++++--
 src/kudu/hms/mini_hms.h                         | 12 +++
 .../mini-cluster/external_mini_cluster-test.cc  |  2 +-
 6 files changed, 191 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b315d0ed/src/kudu/hms/hms_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client-test.cc b/src/kudu/hms/hms_client-test.cc
index 72214a9..9ec0f4c 100644
--- a/src/kudu/hms/hms_client-test.cc
+++ b/src/kudu/hms/hms_client-test.cc
@@ -28,6 +28,10 @@
 #include "kudu/hms/hive_metastore_constants.h"
 #include "kudu/hms/hive_metastore_types.h"
 #include "kudu/hms/mini_hms.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -75,7 +79,7 @@ TEST_F(HmsClientTest, TestHmsOperations) {
   MiniHms hms;
   ASSERT_OK(hms.Start());
 
-  HmsClient client(hms.address());
+  HmsClient client(hms.address(), HmsClientOptions());
   ASSERT_OK(client.Start());
 
   // Create a database.
@@ -123,7 +127,7 @@ TEST_F(HmsClientTest, TestHmsOperations) {
   hive::Table altered_table(my_table);
   altered_table.tableName = new_table_name;
   altered_table.parameters[HmsClient::kKuduTableIdKey] = "bogus-table-id";
-  ASSERT_TRUE(client.AlterTable(database_name, table_name, altered_table).IsRuntimeError());
+  ASSERT_TRUE(client.AlterTable(database_name, table_name, altered_table).IsRemoteError());
 
   // Rename the table.
   altered_table.parameters[HmsClient::kKuduTableIdKey] = table_id;
@@ -157,7 +161,7 @@ TEST_F(HmsClientTest, TestHmsOperations) {
       << "Tables: " << tables;
 
   // Check that the HMS rejects Kudu table drops with a bogus table ID.
-  ASSERT_TRUE(DropTable(&client, database_name, new_table_name, "bogus-table-id").IsRuntimeError());
+  ASSERT_TRUE(DropTable(&client, database_name, new_table_name, "bogus-table-id").IsRemoteError());
   // Check that the HMS rejects non-existent table drops.
   ASSERT_TRUE(DropTable(&client, database_name, "foo-bar", "bogus-table-id").IsNotFound());
 
@@ -196,6 +200,80 @@ TEST_F(HmsClientTest, TestHmsOperations) {
   ASSERT_OK(client.Stop());
 }
 
+TEST_F(HmsClientTest, TestHmsFaultHandling) {
+  MiniHms hms;
+  ASSERT_OK(hms.Start());
+
+  HmsClientOptions options;
+  options.recv_timeout = MonoDelta::FromMilliseconds(500),
+  options.send_timeout = MonoDelta::FromMilliseconds(500);
+  HmsClient client(hms.address(), options);
+  ASSERT_OK(client.Start());
+
+  // Get a specific database.
+  hive::Database my_db;
+  ASSERT_OK(client.GetDatabase("default", &my_db));
+
+  // Shutdown the HMS.
+  ASSERT_OK(hms.Stop());
+  ASSERT_TRUE(client.GetDatabase("default", &my_db).IsNetworkError());
+  ASSERT_OK(client.Stop());
+
+  // Restart the HMS and ensure the client can connect.
+  ASSERT_OK(hms.Start());
+  ASSERT_OK(client.Start());
+  ASSERT_OK(client.GetDatabase("default", &my_db));
+
+  // Pause the HMS and ensure the client times-out appropriately.
+  ASSERT_OK(hms.Pause());
+  ASSERT_TRUE(client.GetDatabase("default", &my_db).IsTimedOut());
+
+  // Unpause the HMS and ensure the client can continue.
+  ASSERT_OK(hms.Resume());
+  ASSERT_OK(client.GetDatabase("default", &my_db));
+}
+
+// Test connecting the HMS client to TCP sockets in various invalid states.
+TEST_F(HmsClientTest, TestHmsConnect) {
+  Sockaddr loopback;
+  ASSERT_OK(loopback.ParseString("127.0.0.1", 0));
+
+  HmsClientOptions options;
+  options.recv_timeout = MonoDelta::FromMilliseconds(100),
+  options.send_timeout = MonoDelta::FromMilliseconds(100);
+  options.conn_timeout = MonoDelta::FromMilliseconds(100);
+
+  auto start_client = [&options] (Sockaddr addr) -> Status {
+    HmsClient client(HostPort(addr), options);
+    return client.Start();
+  };
+
+  // Listening, but not accepting socket.
+  Sockaddr listening;
+  Socket listening_socket;
+  ASSERT_OK(listening_socket.Init(0));
+  ASSERT_OK(listening_socket.BindAndListen(loopback, 1));
+  listening_socket.GetSocketAddress(&listening);
+  ASSERT_TRUE(start_client(listening).IsTimedOut());
+
+  // Bound, but not listening socket.
+  Sockaddr bound;
+  Socket bound_socket;
+  ASSERT_OK(bound_socket.Init(0));
+  ASSERT_OK(bound_socket.Bind(loopback));
+  bound_socket.GetSocketAddress(&bound);
+  ASSERT_TRUE(start_client(bound).IsNetworkError());
+
+  // Unbound socket.
+  Sockaddr unbound;
+  Socket unbound_socket;
+  ASSERT_OK(unbound_socket.Init(0));
+  ASSERT_OK(unbound_socket.Bind(loopback));
+  unbound_socket.GetSocketAddress(&unbound);
+  ASSERT_OK(unbound_socket.Close());
+  ASSERT_TRUE(start_client(unbound).IsNetworkError());
+}
+
 TEST_F(HmsClientTest, TestDeserializeJsonTable) {
   string json = R"#({"1":{"str":"table_name"},"2":{"str":"database_name"}})#";
   hive::Table table;

http://git-wip-us.apache.org/repos/asf/kudu/blob/b315d0ed/src/kudu/hms/hms_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client.cc b/src/kudu/hms/hms_client.cc
index 2ee87a8..c2c2a67 100644
--- a/src/kudu/hms/hms_client.cc
+++ b/src/kudu/hms/hms_client.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <type_traits>
 #include <utility>
@@ -46,6 +47,7 @@ using apache::thrift::protocol::TJSONProtocol;
 using apache::thrift::transport::TBufferedTransport;
 using apache::thrift::transport::TMemoryBuffer;
 using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransportException;
 using std::make_shared;
 using std::shared_ptr;
 using std::string;
@@ -73,7 +75,14 @@ namespace hms {
   } catch (const hive::InvalidOperationException& e) { \
     return Status::IllegalState((msg), e.what()); \
   } catch (const hive::MetaException& e) { \
-    return Status::RuntimeError((msg), e.what()); \
+    return Status::RemoteError((msg), e.what()); \
+  } catch (const TTransportException& e) { \
+    switch (e.getType()) { \
+      case TTransportException::TIMED_OUT: return Status::TimedOut((msg), e.what()); \
+      case TTransportException::BAD_ARGS: return Status::InvalidArgument((msg), e.what());
\
+      case TTransportException::CORRUPTED_DATA: return Status::Corruption((msg), e.what());
\
+      default: return Status::NetworkError((msg), e.what()); \
+    } \
   } catch (const TException& e) { \
     return Status::IOError((msg), e.what()); \
   }
@@ -89,11 +98,30 @@ const char* const HmsClient::kDbNotificationListener =
 const char* const HmsClient::kKuduMetastorePlugin =
   "org.apache.kudu.hive.metastore.KuduMetastorePlugin";
 
-const int kSlowExecutionWarningThresholdMs = 500;
+const int kSlowExecutionWarningThresholdMs = 1000;
+
+namespace {
+// A logging callback for Thrift.
+//
+// Normally this would be defined in a more neutral location (e.g. Impala
+// defines it in thrift-util.cc), but since Hive is currently Kudu's only user
+// of Thrift, it's nice to have the log messsages originate from hms_client.cc.
+void ThriftOutputFunction(const char* output) {
+  LOG(INFO) << output;
+}
+} // anonymous namespace
+
+HmsClient::HmsClient(const HostPort& hms_address, const HmsClientOptions& options)
+      : client_(nullptr) {
+  static std::once_flag set_thrift_logging_callback;
+  std::call_once(set_thrift_logging_callback, [] {
+      apache::thrift::GlobalOutput.setOutputFunction(ThriftOutputFunction);
+  });
 
-HmsClient::HmsClient(const HostPort& hms_address)
-    : client_(nullptr) {
   auto socket = make_shared<TSocket>(hms_address.host(), hms_address.port());
+  socket->setSendTimeout(options.send_timeout.ToMilliseconds());
+  socket->setRecvTimeout(options.recv_timeout.ToMilliseconds());
+  socket->setConnTimeout(options.conn_timeout.ToMilliseconds());
   auto transport = make_shared<TBufferedTransport>(std::move(socket));
   auto protocol = make_shared<TBinaryProtocol>(std::move(transport));
   client_ = hive::ThriftHiveMetastoreClient(std::move(protocol));
@@ -104,7 +132,7 @@ HmsClient::~HmsClient() {
 }
 
 Status HmsClient::Start() {
-  SCOPED_LOG_SLOW_EXECUTION(WARNING, 1000 /* ms */, "starting HMS client");
+  SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "starting HMS client");
   HMS_RET_NOT_OK(client_.getOutputProtocol()->getTransport()->open(),
                  "failed to open Hive MetaStore connection");
 
@@ -177,6 +205,7 @@ Status HmsClient::CreateTable(const hive::Table& table) {
 Status HmsClient::AlterTable(const std::string& database_name,
                              const std::string& table_name,
                              const hive::Table& table) {
+  SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "alter HMS table");
   HMS_RET_NOT_OK(client_.alter_table(database_name, table_name, table),
                  "failed to alter Hive MetaStore table");
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/b315d0ed/src/kudu/hms/hms_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client.h b/src/kudu/hms/hms_client.h
index a5ed09f..0ce17f9 100644
--- a/src/kudu/hms/hms_client.h
+++ b/src/kudu/hms/hms_client.h
@@ -24,6 +24,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/hms/ThriftHiveMetastore.h"
 #include "kudu/hms/hive_metastore_types.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
@@ -41,20 +42,42 @@ enum class Cascade {
   kFalse,
 };
 
+struct HmsClientOptions {
+
+  // Thrift socket send timeout
+  MonoDelta send_timeout = MonoDelta::FromSeconds(60);
+
+  // Thrift socket receive timeout.
+  MonoDelta recv_timeout = MonoDelta::FromSeconds(60);
+
+  // Thrift socket connect timeout.
+  MonoDelta conn_timeout = MonoDelta::FromSeconds(60);
+};
+
 // A client for the Hive MetaStore.
 //
 // All operations are synchronous, and may block.
 //
 // HmsClient is not thread safe.
 //
-// TODO(dan): this client is lacking adequate failure handling, including:
-//  - Documentation of specific Status codes returned in error scenarios
-//  - Connection timeouts
-//  - Handling and/or documentation of retry and reconnection behavior
+// HmsClient wraps a single TCP connection to an HMS, and does not attempt to
+// handle or retry on failure. It's expected that a higher-level component will
+// wrap HmsClient to provide retry, pooling, and HA deployment features if
+// necessary.
 //
-// TODO(dan): this client does not handle HA (multi) HMS deployments.
+// Note: Thrift provides a handy TSocketPool class which could be useful in
+// allowing the HmsClient to transparently handle connecting to a pool of HA HMS
+// instances. However, because TSocketPool handles choosing the instance during
+// socket connect, it can't determine if the remote endpoint is actually an HMS,
+// or just a random listening TCP socket. Nor can it do application-level checks
+// like ensuring that the connected HMS is configured with the Kudu Metastore
+// plugin. So, it's better for a higher-level construct to handle connecting to
+// HA deployments by wrapping multiple HmsClient instances. HmsClient punts on
+// handling connection retries, because the higher-level construct which is
+// handling HA deployments will naturally want to retry across HMS instances as
+// opposed to retrying repeatedly on a single instance.
 //
-// TODO(dan): this client does not handle Kerberized HMS deployments.
+// TODO(dan): this client does not yet handle Kerberized HMS deployments.
 class HmsClient {
  public:
 
@@ -66,13 +89,18 @@ class HmsClient {
   static const char* const kDbNotificationListener;
   static const char* const kKuduMetastorePlugin;
 
-  explicit HmsClient(const HostPort& hms_address);
+  // Create an HmsClient connection to the proided HMS Thrift RPC address.
+  //
+  // The individual timeouts may be set to enforce per-operation
+  // (read/write/connect) timeouts.
+  HmsClient(const HostPort& hms_address, const HmsClientOptions& options);
   ~HmsClient();
 
   // Starts the HMS client.
   //
   // This method will open a synchronous TCP connection to the HMS. If the HMS
-  // can not be reached, an error is returned.
+  // can not be reached within the connection timeout interval, an error is
+  // returned.
   //
   // Must be called before any subsequent operations using the client.
   Status Start() WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/kudu/blob/b315d0ed/src/kudu/hms/mini_hms.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/mini_hms.cc b/src/kudu/hms/mini_hms.cc
index 68ef4b1..c7de53d 100644
--- a/src/kudu/hms/mini_hms.cc
+++ b/src/kudu/hms/mini_hms.cc
@@ -49,11 +49,7 @@ namespace kudu {
 namespace hms {
 
 MiniHms::~MiniHms() {
-  if (hms_process_) {
-    VLOG(1) << "Stopping HMS";
-    unique_ptr<Subprocess> proc = std::move(hms_process_);
-    WARN_NOT_OK(proc->KillAndWait(SIGTERM), "failed to stop the Hive MetaStore process");
-  }
+  WARN_NOT_OK(Stop(), "Failed to stop MiniHms");
 }
 
 namespace {
@@ -112,7 +108,7 @@ Status MiniHms::Start() {
         Substitute("$0/bin/hive", hive_home),
         "--service", "metastore",
         "-v",
-        "-p", "0", // Use an ephemeral port.
+        "-p", std::to_string(port_),
   }));
 
   hms_process_->SetEnvVars(env_vars);
@@ -128,6 +124,31 @@ Status MiniHms::Start() {
   return wait;
 }
 
+Status MiniHms::Stop() {
+  if (hms_process_) {
+    VLOG(1) << "Stopping HMS";
+    unique_ptr<Subprocess> proc = std::move(hms_process_);
+    RETURN_NOT_OK_PREPEND(proc->KillAndWait(SIGTERM), "failed to stop the Hive MetaStore
process");
+  }
+  return Status::OK();
+}
+
+Status MiniHms::Pause() {
+  CHECK(hms_process_);
+  VLOG(1) << "Pausing HMS";
+  RETURN_NOT_OK_PREPEND(hms_process_->Kill(SIGSTOP),
+                        "failed to pause the Hive MetaStore process");
+  return Status::OK();
+}
+
+Status MiniHms::Resume() {
+  CHECK(hms_process_);
+  VLOG(1) << "Resuming HMS";
+  RETURN_NOT_OK_PREPEND(hms_process_->Kill(SIGCONT),
+                        "failed to unpause the Hive MetaStore process");
+  return Status::OK();
+}
+
 Status MiniHms::CreateHiveSite(const string& tmp_dir) const {
   // 'datanucleus.schema.autoCreateAll' and 'hive.metastore.schema.verification'
   // allow Hive to startup and run without first running the schemaTool.

http://git-wip-us.apache.org/repos/asf/kudu/blob/b315d0ed/src/kudu/hms/mini_hms.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/mini_hms.h b/src/kudu/hms/mini_hms.h
index 2e5ac30..4bef966 100644
--- a/src/kudu/hms/mini_hms.h
+++ b/src/kudu/hms/mini_hms.h
@@ -46,8 +46,20 @@ class MiniHms {
   }
 
   // Starts the mini Hive metastore.
+  //
+  // If the MiniHms has already been started and stopped, it will be restarted
+  // using the same listening port.
   Status Start() WARN_UNUSED_RESULT;
 
+  // Stops the mini Hive metastore.
+  Status Stop() WARN_UNUSED_RESULT;
+
+  // Pause the Hive metastore process.
+  Status Pause() WARN_UNUSED_RESULT;
+
+  // Unpause the Hive metastore process.
+  Status Resume() WARN_UNUSED_RESULT;
+
   // Returns the address of the Hive metastore. Should only be called after the
   // metastore is started.
   HostPort address() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/b315d0ed/src/kudu/mini-cluster/external_mini_cluster-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster-test.cc b/src/kudu/mini-cluster/external_mini_cluster-test.cc
index 288e5df..c6787f4 100644
--- a/src/kudu/mini-cluster/external_mini_cluster-test.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster-test.cc
@@ -204,7 +204,7 @@ TEST_P(ExternalMiniClusterTest, TestBasicOperation) {
 
   // Verify that the HMS is reachable.
   if (opts.enable_hive_metastore) {
-    hms::HmsClient hms_client(cluster.hms()->address());
+    hms::HmsClient hms_client(cluster.hms()->address(), hms::HmsClientOptions());
     ASSERT_OK(hms_client.Start());
     vector<string> tables;
     ASSERT_OK(hms_client.GetAllTables("default", &tables));


Mime
View raw message