kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject [2/2] kudu git commit: KUDU-2191 (12/n): Hive Metastore notification log event listener
Date Fri, 15 Jun 2018 17:22:07 GMT
KUDU-2191 (12/n): Hive Metastore notification log event listener

This commit adds a notification log event listener to the Hive Metastore
integration, and shifts how the catalog manager handles table metadata
when the HMS integration is enabled. The leader master now listens for
drop table and alter table events in the HMS, and applies them to the
Kudu catalog as necessary. The latest handled notification log event
index is recorded in the sys-catalog. When the HMS integration is
enabled, the HMS is considered the source of truth for the table
namespace.

As a result, rename and delete RPCs are handled specially to ensure they
are applied to the HMS first, and only once they are committed in the
HMS are they applied to the Kudu catalog, through the new notification
log listener. Alter table operations which include a table rename and
other changes are handled by first performing the rename, then
performing the remaining changes. As a result, the alter table operation
as a whole is no longer applied atomically.

Testing: this is a hard component to test, because it is necessarily
tied to the catalog manager. I've added tests for specific scenarios in
master_hms-itest, however I'm aware that not all codepaths in the
listener are covered. master-stress-test and alter_table-randomized-test
are now parameterized to run with the HMS integration enabled, which has
been effective at revealing issues.

The HMS integration has known issues with capital letters in table
names. A follow-up commit will introduce a fix and tests for this issue.

Change-Id: I32ed099c44a593ffe514152135957018f21ed775
Reviewed-on: http://gerrit.cloudera.org:8080/8313
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5f1ca32f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5f1ca32f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5f1ca32f

Branch: refs/heads/master
Commit: 5f1ca32f347d9ae76e34dc3ea8a3004de27de5b9
Parents: b5f3d1a
Author: Dan Burkert <danburkert@apache.org>
Authored: Fri Sep 22 15:04:46 2017 -0700
Committer: Dan Burkert <danburkert@apache.org>
Committed: Fri Jun 15 17:15:41 2018 +0000

----------------------------------------------------------------------
 src/kudu/hms/hms_catalog.cc                     |  15 +
 src/kudu/hms/hms_catalog.h                      |  10 +-
 src/kudu/hms/hms_client-test.cc                 |   9 +-
 src/kudu/hms/mini_hms.cc                        |  24 +-
 src/kudu/integration-tests/CMakeLists.txt       |  11 +-
 .../alter_table-randomized-test.cc              |  11 +-
 .../integration-tests/master-stress-test.cc     |  21 +-
 .../integration-tests/master_failover-itest.cc  |  35 +-
 src/kudu/integration-tests/master_hms-itest.cc  | 213 ++++++----
 src/kudu/master/CMakeLists.txt                  |   2 +
 src/kudu/master/catalog_manager.cc              | 339 ++++++++++-----
 src/kudu/master/catalog_manager.h               |  74 +++-
 .../hms_notification_log_listener-test.cc       | 104 +++++
 .../master/hms_notification_log_listener.cc     | 407 +++++++++++++++++++
 src/kudu/master/hms_notification_log_listener.h | 157 +++++++
 src/kudu/master/master.proto                    |   6 +
 src/kudu/master/sys_catalog.cc                  |  38 ++
 src/kudu/master/sys_catalog.h                   |  14 +
 src/kudu/util/test_util.cc                      |   3 +-
 19 files changed, 1267 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/hms/hms_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
index 944eecd..a45872f 100644
--- a/src/kudu/hms/hms_catalog.cc
+++ b/src/kudu/hms/hms_catalog.cc
@@ -237,6 +237,13 @@ Status HmsCatalog::AlterTable(const string& id,
   });
 }
 
+Status HmsCatalog::GetNotificationEvents(int64_t last_event_id, int max_events,
+                                         vector<hive::NotificationEvent>* events) {
+  return Execute([&] (HmsClient* client) {
+    return client->GetNotificationEvents(last_event_id, max_events, events);
+  });
+}
+
 template<typename Task>
 Status HmsCatalog::Execute(Task task) {
   Synchronizer synchronizer;
@@ -427,6 +434,14 @@ Status HmsCatalog::PopulateTable(const string& id,
   table->parameters[HmsClient::kKuduTableIdKey] = id;
   table->parameters[HmsClient::kKuduMasterAddrsKey] = master_addresses;
   table->parameters[HmsClient::kStorageHandlerKey] = HmsClient::kKuduStorageHandler;
+  // Workaround for HIVE-19253.
+  table->parameters[HmsClient::kExternalTableKey] = "TRUE";
+
+  // Set the table type to external so that the table's (HD)FS directory will
+  // not be deleted when the table is dropped. Deleting the directory is
+  // unnecessary, and causes a race in the HMS between concurrent DROP TABLE and
+  // CREATE TABLE operations on existing tables.
+  table->tableType = HmsClient::kExternalTable;
 
   // Remove the deprecated Kudu-specific field 'kudu.table_name'.
   EraseKeyReturnValuePtr(&table->parameters, HmsClient::kLegacyKuduTableNameKey);

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/hms/hms_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_catalog.h b/src/kudu/hms/hms_catalog.h
index 0f0c690..59ac411 100644
--- a/src/kudu/hms/hms_catalog.h
+++ b/src/kudu/hms/hms_catalog.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <cstdint>
 #include <string>
 #include <vector>
 
@@ -30,7 +31,7 @@
 #include "kudu/util/status.h"
 
 namespace hive {
-class EnvironmentContext;
+class NotificationEvent;
 class Table;
 }
 
@@ -99,6 +100,13 @@ class HmsCatalog {
   // This method will fail if the HMS is unreachable.
   Status RetrieveTables(std::vector<hive::Table>* hms_tables) WARN_UNUSED_RESULT;
 
+  // Retrieves notification log events from the HMS.
+  //
+  // The events will begin at id 'last_event_id + 1', and at most 'max_events'
+  // events are returned.
+  Status GetNotificationEvents(int64_t last_event_id, int max_events,
+                               std::vector<hive::NotificationEvent>* events);
+
   // Validates the hive_metastore_uris gflag.
   static bool ValidateUris(const char* flag_name, const std::string& metastore_uris);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/hms/hms_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client-test.cc b/src/kudu/hms/hms_client-test.cc
index 940d66d..8e1dcd7 100644
--- a/src/kudu/hms/hms_client-test.cc
+++ b/src/kudu/hms/hms_client-test.cc
@@ -63,12 +63,13 @@ class HmsClientTest : public KuduTest,
     hive::Table table;
     table.dbName = database_name;
     table.tableName = table_name;
-    table.tableType = HmsClient::kManagedTable;
+    table.tableType = HmsClient::kExternalTable;
 
     table.__set_parameters({
         make_pair(HmsClient::kKuduTableIdKey, table_id),
         make_pair(HmsClient::kKuduMasterAddrsKey, string("TODO")),
         make_pair(HmsClient::kStorageHandlerKey, HmsClient::kKuduStorageHandler),
+        make_pair(HmsClient::kExternalTableKey, "TRUE")
     });
 
     hive::EnvironmentContext env_ctx;
@@ -168,7 +169,7 @@ TEST_P(HmsClientTest, TestHmsOperations) {
   EXPECT_EQ(table_name, my_table.tableName);
   EXPECT_EQ(table_id, my_table.parameters[HmsClient::kKuduTableIdKey]);
   EXPECT_EQ(HmsClient::kKuduStorageHandler, my_table.parameters[HmsClient::kStorageHandlerKey]);
-  EXPECT_EQ(HmsClient::kManagedTable, my_table.tableType);
+  EXPECT_EQ(HmsClient::kExternalTable, my_table.tableType);
 
   string new_table_name = "my_altered_table";
 
@@ -193,7 +194,7 @@ TEST_P(HmsClientTest, TestHmsOperations) {
   EXPECT_EQ(table_id, renamed_table.parameters[HmsClient::kKuduTableIdKey]);
   EXPECT_EQ(HmsClient::kKuduStorageHandler,
             renamed_table.parameters[HmsClient::kStorageHandlerKey]);
-  EXPECT_EQ(HmsClient::kManagedTable, renamed_table.tableType);
+  EXPECT_EQ(HmsClient::kExternalTable, renamed_table.tableType);
 
   // Create a table with an uppercase name.
   string uppercase_table_name = "my_UPPERCASE_Table";
@@ -286,7 +287,7 @@ TEST_P(HmsClientTest, TestLargeObjects) {
   hive::Table table;
   table.dbName = database_name;
   table.tableName = table_name;
-  table.tableType = HmsClient::kManagedTable;
+  table.tableType = HmsClient::kExternalTable;
   hive::FieldSchema partition_key;
   partition_key.name = "c1";
   partition_key.type = "int";

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/hms/mini_hms.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/mini_hms.cc b/src/kudu/hms/mini_hms.cc
index aabea83..2ce2ea9 100644
--- a/src/kudu/hms/mini_hms.cc
+++ b/src/kudu/hms/mini_hms.cc
@@ -117,17 +117,33 @@ Status MiniHms::Start() {
 
   // Comma-separated list of additional jars to add to the HMS classpath.
   string aux_jars = Substitute("$0/hms-plugin.jar", bin_dir);
+
+  // List of JVM environment options to pass to the HMS.
+  string java_options =
+    // Make logging less verbose.
+    "-Dhive.log.level=WARN "
+    // Log to the console.
+    "-Dhive.root.logger=console "
+    // Tune down the Derby deadlock timeout. The HMS's use of Derby with the
+    // NOTIFICATION_SEQUENCE table is prone to deadlocks, at which point Derby
+    // cancels a conflicting transaction after waiting out the timeout. This
+    // typically doesn't cause issues since the HMS auto retries these
+    // transactions, however the default period of 20 seconds causes tests to
+    // timeout.
+    "-Dderby.locks.deadlockTimeout=1";
+
+  if (!krb5_conf_.empty()) {
+    java_options += Substitute(" -Djava.security.krb5.conf=$0", krb5_conf_);
+  }
+
   map<string, string> env_vars {
       { "JAVA_HOME", java_home },
       { "HADOOP_HOME", hadoop_home },
       { "HIVE_AUX_JARS_PATH", aux_jars },
       { "HIVE_CONF_DIR", tmp_dir },
-      { "JAVA_TOOL_OPTIONS",  "-Dhive.log.level=WARN -Dhive.root.logger=console" },
+      { "JAVA_TOOL_OPTIONS", java_options },
       { "HADOOP_CONF_DIR", tmp_dir },
   };
-  if (!krb5_conf_.empty()) {
-    env_vars["JAVA_TOOL_OPTIONS"] += Substitute(" -Djava.security.krb5.conf=$0", krb5_conf_);
-  }
 
   // Start the HMS.
   hms_process_.reset(new Subprocess({

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index f5ed494..07bc6cf 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -47,16 +47,15 @@ target_link_libraries(itest_util
   kudu_tools_util
   security_test_util)
 add_dependencies(itest_util
-  kudu-tserver
-  kudu-master)
+  kudu-master
+  kudu-tserver)
 
 # Tests
 set(KUDU_TEST_LINK_LIBS itest_util ${KUDU_MIN_TEST_LIBS})
 ADD_KUDU_TEST(all_types-itest
   PROCESSORS 4
   NUM_SHARDS 8)
-ADD_KUDU_TEST(alter_table-randomized-test
-  PROCESSORS 4)
+ADD_KUDU_TEST(alter_table-randomized-test NUM_SHARDS 2 PROCESSORS 4)
 ADD_KUDU_TEST(alter_table-test PROCESSORS 3)
 ADD_KUDU_TEST(authn_token_expire-itest)
 ADD_KUDU_TEST(catalog_manager_tsk-itest PROCESSORS 2)
@@ -81,7 +80,9 @@ ADD_KUDU_TEST(heavy-update-compaction-itest RUN_SERIAL true)
 ADD_KUDU_TEST(linked_list-test RUN_SERIAL true)
 ADD_KUDU_TEST(log-rolling-itest)
 ADD_KUDU_TEST(master_cert_authority-itest PROCESSORS 2)
-ADD_KUDU_TEST(master_failover-itest PROCESSORS 3)
+ADD_KUDU_TEST(master_failover-itest NUM_SHARDS 4 PROCESSORS 3)
+ADD_KUDU_TEST_DEPENDENCIES(master_failover-itest
+  kudu)
 ADD_KUDU_TEST(master_hms-itest RUN_SERIAL true PROCESSORS 4)
 ADD_KUDU_TEST(master_migration-itest)
 ADD_KUDU_TEST_DEPENDENCIES(master_migration-itest

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/integration-tests/alter_table-randomized-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/alter_table-randomized-test.cc b/src/kudu/integration-tests/alter_table-randomized-test.cc
index 89d2371..e1739e8 100644
--- a/src/kudu/integration-tests/alter_table-randomized-test.cc
+++ b/src/kudu/integration-tests/alter_table-randomized-test.cc
@@ -93,14 +93,15 @@ const vector <KuduColumnStorageAttributes::EncodingType> kInt32Encodings =
 const vector<int32_t> kBlockSizes = {0, 2 * 1024 * 1024,
                                      4 * 1024 * 1024, 8 * 1024 * 1024};
 
-class AlterTableRandomized : public KuduTest {
+class AlterTableRandomized : public KuduTest,
+                             public ::testing::WithParamInterface<HmsMode> {
  public:
   void SetUp() override {
     KuduTest::SetUp();
 
     ExternalMiniClusterOptions opts;
     opts.num_tablet_servers = 3;
-    opts.hms_mode = HmsMode::ENABLE_METASTORE_INTEGRATION;
+    opts.hms_mode = GetParam();
     // This test produces tables with lots of columns. With container preallocation,
     // we end up using quite a bit of disk space. So, we disable it.
     opts.extra_tserver_flags.emplace_back("--log_container_preallocate_bytes=0");
@@ -138,6 +139,10 @@ class AlterTableRandomized : public KuduTest {
   shared_ptr<KuduClient> client_;
 };
 
+// Run the test with the HMS integration enabled and disabled.
+INSTANTIATE_TEST_CASE_P(HmsConfigurations, AlterTableRandomized,
+                        ::testing::Values(HmsMode::NONE, HmsMode::ENABLE_METASTORE_INTEGRATION));
+
 struct RowState {
   // We use this special value to denote NULL values.
   // We ensure that we never insert or update to this value except in the case of
@@ -696,7 +701,7 @@ struct MirrorTable {
 // During the sequence of operations, a "mirror" of the table in memory is kept up to
 // date. We periodically scan the actual table, and ensure that the data in Kudu
 // matches our in-memory "mirror".
-TEST_F(AlterTableRandomized, TestRandomSequence) {
+TEST_P(AlterTableRandomized, TestRandomSequence) {
   MirrorTable t(client_);
   ASSERT_OK(t.Create());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/integration-tests/master-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/master-stress-test.cc b/src/kudu/integration-tests/master-stress-test.cc
index 9dfd600..e5ec7ad 100644
--- a/src/kudu/integration-tests/master-stress-test.cc
+++ b/src/kudu/integration-tests/master-stress-test.cc
@@ -32,6 +32,7 @@
 #include "kudu/client/client.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h"
+#include "kudu/common/common.pb.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -96,7 +97,8 @@ using tools::LeaderMasterProxy;
 
 static const MonoDelta kDefaultAdminTimeout = MonoDelta::FromSeconds(300);
 
-class MasterStressTest : public KuduTest {
+class MasterStressTest : public KuduTest,
+                         public ::testing::WithParamInterface<HmsMode> {
  public:
   MasterStressTest()
     : done_(1),
@@ -120,10 +122,9 @@ class MasterStressTest : public KuduTest {
     opts.num_masters = 3;
     opts.num_tablet_servers = 3;
 
-    // TODO(dan): enable HMS integration. Currently the test fails when it's
-    // enabled because the HMS and Kudu catalogs become unsynchronized when
-    // masters are killed while operating on the catalog.
-    // opts.enable_hive_metastore = true;
+    opts.hms_mode = GetParam();
+    // Tune down the notification log poll period in order to speed up catalog convergence.
+    opts.extra_master_flags.emplace_back("--hive_metastore_notification_log_poll_period_seconds=1");
 
     // Don't preallocate log segments, since we're creating many tablets here.
     // If each preallocates 64M or so, we use a ton of disk space in this
@@ -156,9 +157,6 @@ class MasterStressTest : public KuduTest {
     // the global operation timeout.
     builder.default_admin_operation_timeout(kDefaultAdminTimeout);
 
-    // Encourage the client to switch masters quickly in the event of failover.
-    builder.default_rpc_timeout(MonoDelta::FromSeconds(1));
-
     ASSERT_OK(cluster_->CreateClient(&builder, &client_));
 
     // Populate the ts_map_ for the ReplaceTablet thread.
@@ -433,7 +431,11 @@ class MasterStressTest : public KuduTest {
   std::unordered_map<string, itest::TServerDetails*> ts_map_;
 };
 
-TEST_F(MasterStressTest, Test) {
+// Run the test with the HMS integration enabled and disabled.
+INSTANTIATE_TEST_CASE_P(HmsConfigurations, MasterStressTest,
+                        ::testing::Values(HmsMode::NONE, HmsMode::ENABLE_METASTORE_INTEGRATION));
+
+TEST_P(MasterStressTest, Test) {
   OverrideFlagForSlowTests("num_create_table_threads", "10");
   OverrideFlagForSlowTests("num_alter_table_threads", "5");
   OverrideFlagForSlowTests("num_delete_table_threads", "5");
@@ -477,5 +479,4 @@ TEST_F(MasterStressTest, Test) {
   LOG(INFO) << "Tablets replaced: " << num_tablets_replaced_.Load();
   LOG(INFO) << "Masters restarted: " << num_masters_restarted_.Load();
 }
-
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/integration-tests/master_failover-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/master_failover-itest.cc b/src/kudu/integration-tests/master_failover-itest.cc
index 54248ef..cb1af80 100644
--- a/src/kudu/integration-tests/master_failover-itest.cc
+++ b/src/kudu/integration-tests/master_failover-itest.cc
@@ -71,7 +71,8 @@ using std::vector;
 using strings::Split;
 using strings::Substitute;
 
-class MasterFailoverTest : public KuduTest {
+class MasterFailoverTest : public KuduTest,
+                           public ::testing::WithParamInterface<HmsMode> {
  public:
   enum CreateTableMode {
     kWaitForCreate = 0,
@@ -81,7 +82,7 @@ class MasterFailoverTest : public KuduTest {
   MasterFailoverTest() {
     opts_.num_masters = 3;
     opts_.num_tablet_servers = kNumTabletServerReplicas;
-    opts_.hms_mode = HmsMode::ENABLE_METASTORE_INTEGRATION;
+    opts_.hms_mode = GetParam();
 
     // Reduce various timeouts below as to make the detection of
     // leader master failures (specifically, failures as result of
@@ -156,11 +157,15 @@ class MasterFailoverTest : public KuduTest {
   shared_ptr<KuduClient> client_;
 };
 
+// Run the test with the HMS integration enabled and disabled.
+INSTANTIATE_TEST_CASE_P(HmsConfigurations, MasterFailoverTest,
+                        ::testing::Values(HmsMode::NONE, HmsMode::ENABLE_METASTORE_INTEGRATION));
+
 // Test that synchronous CreateTable (issue CreateTable call and then
 // wait until the table has been created) works even when the original
 // leader master has been paused.
-TEST_F(MasterFailoverTest, TestCreateTableSync) {
-  const char* kTableName = "default.testCreateTableSync";
+TEST_P(MasterFailoverTest, TestCreateTableSync) {
+  const char* kTableName = "default.test_create_table_sync";
 
   if (!AllowSlowTests()) {
     LOG(INFO) << "This test can only be run in slow mode.";
@@ -192,8 +197,8 @@ TEST_F(MasterFailoverTest, TestCreateTableSync) {
 // Test that we can issue a CreateTable call, pause the leader master
 // immediately after, then verify that the table has been created on
 // the newly elected leader master.
-TEST_F(MasterFailoverTest, TestPauseAfterCreateTableIssued) {
-  const char* kTableName = "default.testPauseAfterCreateTableIssued";
+TEST_P(MasterFailoverTest, TestPauseAfterCreateTableIssued) {
+  const char* kTableName = "default.test_pause_after_create_table_issued";
 
   if (!AllowSlowTests()) {
     LOG(INFO) << "This test can only be run in slow mode.";
@@ -223,8 +228,8 @@ TEST_F(MasterFailoverTest, TestPauseAfterCreateTableIssued) {
 // Test the scenario where we create a table, pause the leader master,
 // and then issue the DeleteTable call: DeleteTable should go to the newly
 // elected leader master and succeed.
-TEST_F(MasterFailoverTest, TestDeleteTableSync) {
-  const char* kTableName = "default.testDeleteTableSync";
+TEST_P(MasterFailoverTest, TestDeleteTableSync) {
+  const char* kTableName = "default.test_delete_table_sync";
   if (!AllowSlowTests()) {
     LOG(INFO) << "This test can only be run in slow mode.";
     return;
@@ -255,9 +260,9 @@ TEST_F(MasterFailoverTest, TestDeleteTableSync) {
 //
 // TODO(unknown): Add an equivalent async test. Add a test for adding and/or
 // renaming a column in a table.
-TEST_F(MasterFailoverTest, TestRenameTableSync) {
-  const char* kTableNameOrig = "default.testAlterTableSync";
-  const char* kTableNameNew = "default.testAlterTableSyncRenamed";
+TEST_P(MasterFailoverTest, TestRenameTableSync) {
+  const char* kTableNameOrig = "default.test_alter_table_sync";
+  const char* kTableNameNew = "default.test_alter_table_sync_renamed";
 
   if (!AllowSlowTests()) {
     LOG(INFO) << "This test can only be run in slow mode.";
@@ -284,8 +289,8 @@ TEST_F(MasterFailoverTest, TestRenameTableSync) {
 }
 
 
-TEST_F(MasterFailoverTest, TestKUDU1374) {
-  const char* kTableName = "default.testKUDU1374";
+TEST_P(MasterFailoverTest, TestKUDU1374) {
+  const char* kTableName = "default.test_kudu_1374";
 
   // Wait at least one additional heartbeat interval after creating the table.
   // The idea is to guarantee that all tservers sent a tablet report with the
@@ -328,7 +333,7 @@ TEST_F(MasterFailoverTest, TestKUDU1374) {
   NO_PENDING_FATALS();
 }
 
-TEST_F(MasterFailoverTest, TestMasterUUIDResolution) {
+TEST_P(MasterFailoverTest, TestMasterUUIDResolution) {
   // After a fresh start, the masters should have received RPCs asking for
   // their UUIDs.
   for (int i = 0; i < cluster_->num_masters(); i++) {
@@ -367,7 +372,7 @@ TEST_F(MasterFailoverTest, TestMasterUUIDResolution) {
   }
 }
 
-TEST_F(MasterFailoverTest, TestMasterPermanentFailure) {
+TEST_P(MasterFailoverTest, TestMasterPermanentFailure) {
   const string kBinPath = cluster_->GetBinaryPath("kudu");
   Random r(SeedRandom());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/integration-tests/master_hms-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/master_hms-itest.cc b/src/kudu/integration-tests/master_hms-itest.cc
index 1648595..68c6728 100644
--- a/src/kudu/integration-tests/master_hms-itest.cc
+++ b/src/kudu/integration-tests/master_hms-itest.cc
@@ -70,6 +70,8 @@ class MasterHmsTest : public ExternalMiniClusterITestBase {
     opts.hms_mode = HmsMode::ENABLE_METASTORE_INTEGRATION;
     opts.num_masters = 1;
     opts.num_tablet_servers = 1;
+    // Tune down the notification log poll period in order to speed up catalog convergence.
+    opts.extra_master_flags.emplace_back("--hive_metastore_notification_log_poll_period_seconds=1");
     StartClusterWithOpts(std::move(opts));
 
     hms_client_.reset(new HmsClient(cluster_->hms()->address(), HmsClientOptions()));
@@ -133,6 +135,34 @@ class MasterHmsTest : public ExternalMiniClusterITestBase {
                          .Create();
   }
 
+  // Rename a table entry in the HMS catalog.
+  Status RenameHmsTable(const string& database_name,
+                        const string& old_table_name,
+                        const string& new_table_name) {
+    // The HMS doesn't have a rename table API. Instead it offers the more
+    // general AlterTable API, which requires the entire set of table fields to be
+    // set. Since we don't know these fields during a simple rename operation, we
+    // have to look them up.
+    hive::Table table;
+    RETURN_NOT_OK(hms_client_->GetTable(database_name, old_table_name, &table));
+    table.tableName = new_table_name;
+    return hms_client_->AlterTable(database_name, old_table_name, table);
+  }
+
+  // Drop all columns from a Kudu HMS table entry.
+  Status AlterHmsTableDropColumns(const string& database_name, const string& table_name) {
+    hive::Table table;
+    RETURN_NOT_OK(hms_client_->GetTable(database_name, table_name, &table));
+    table.sd.cols.clear();
+
+    // The KuduMetastorePlugin only allows the master to alter the columns in a
+    // Kudu table, so we pretend to be the master.
+    hive::EnvironmentContext env_ctx;
+    env_ctx.__set_properties({ std::make_pair(hms::HmsClient::kKuduMasterEventKey, "true") });
+    RETURN_NOT_OK(hms_client_->AlterTable(database_name, table_name, table, env_ctx));
+    return Status::OK();
+  }
+
   // Checks that the Kudu table schema and the HMS table entry in their
   // respective catalogs are synchronized for a particular table.
   void CheckTable(const string& database_name, const string& table_name) {
@@ -269,7 +299,7 @@ TEST_F(MasterHmsTest, TestRenameTable) {
   s = table_alterer->Alter();
   ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
 
-  // Start the HMS and rename the table back to the original name.  This is the happy path.
+  // Start the HMS and rename the table through Kudu.
   ASSERT_OK(StartHms());
   ASSERT_EVENTUALLY([&] {
     // HmsCatalog throttles reconnections, so it's necessary to wait out the backoff.
@@ -278,53 +308,35 @@ TEST_F(MasterHmsTest, TestRenameTable) {
   NO_FATALS(CheckTable("db", "c"));
   NO_FATALS(CheckTableDoesNotExist("db", "a"));
 
-  // Drop the HMS table entry, then create a non-Kudu table entry in it's place,
-  // and attempt to rename the table.
-  ASSERT_OK(hms_client_->DropTable("db", "c"));
-  hive::Table external_table_2;
-  external_table_2.dbName = "db";
-  external_table_2.tableName = "c";
-  ASSERT_OK(hms_client_->CreateTable(external_table_2));
-  table_alterer.reset(client_->NewTableAlterer("db.c"));
-  s = table_alterer->RenameTo("db.a")->Alter();
-  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  // Rename the table through the HMS, and ensure the rename is handled in Kudu.
+  ASSERT_OK(RenameHmsTable("db", "c", "d"));
+  ASSERT_EVENTUALLY([&] {
+    NO_FATALS(CheckTable("db", "d"));
+  });
 
-  // Check that all three tables still exist.
+  // Check that the two tables still exist.
   vector<string> tables;
   ASSERT_OK(hms_client_->GetAllTables("db", &tables));
   std::sort(tables.begin(), tables.end());
-  ASSERT_EQ(tables, vector<string>({ "b", "c" })) << tables;
+  ASSERT_EQ(tables, vector<string>({ "b", "d" })) << tables;
 }
 
 TEST_F(MasterHmsTest, TestAlterTable) {
-  const char* hms_database_name = "alter_db";
-  const char* hms_table_name = "table";
-  string table_name = Substitute("$0.$1", hms_database_name, hms_table_name);
-
-  ASSERT_OK(CreateDatabase(hms_database_name));
-
   // Create the Kudu table.
-  ASSERT_OK(CreateKuduTable(hms_database_name, hms_table_name));
-  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+  ASSERT_OK(CreateKuduTable("default", "a"));
+  NO_FATALS(CheckTable("default", "a"));
 
-  // Alter the HMS table entry in a destructive way (remove the columns).
-  hive::Table hms_table;
-  ASSERT_OK(hms_client_->GetTable(hms_database_name, hms_table_name, &hms_table));
-  hms_table.sd.cols.clear();
-  // The KuduMetastorePlugin requires column alteration events to come from a Kudu Master.
-  ASSERT_OK(hms_client_->AlterTable(hms_database_name, hms_table_name, hms_table, MasterEnvCtx()));
-  hive::Table altered_table;
-  ASSERT_OK(hms_client_->GetTable(hms_database_name, hms_table_name, &altered_table));
-  ASSERT_TRUE(altered_table.sd.cols.empty());
-
-  // Drop a column. This should correct the entire set of columns in the HMS.
-  unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(table_name));
+  // Alter the HMS table entry in a destructive way (remove all columns).
+  ASSERT_OK(AlterHmsTableDropColumns("default", "a"));
+
+  // Drop a column in Kudu. This should correct the entire set of columns in the HMS.
+  unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer("default.a"));
   ASSERT_OK(table_alterer->DropColumn("int8_val")->Alter());
-  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+  NO_FATALS(CheckTable("default", "a"));
 
   // Shutdown the HMS and try to alter the table.
   ASSERT_OK(StopHms());
-  table_alterer.reset(client_->NewTableAlterer(table_name)->DropColumn("int16_val"));
+  table_alterer.reset(client_->NewTableAlterer("default.a")->DropColumn("int16_val"));
   Status s = table_alterer->Alter();
   ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
 
@@ -334,66 +346,103 @@ TEST_F(MasterHmsTest, TestAlterTable) {
     // HmsCatalog throttles reconnections, so it's necessary to wait out the backoff.
     ASSERT_OK(table_alterer->Alter());
   });
-  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
-
-  // Drop the table from the HMS, and insert a non-Kudu table entry, then try
-  // and alter the table.
-  ASSERT_OK(hms_client_->DropTable(hms_database_name, hms_table_name));
-  hms_table = hive::Table();
-  hms_table.dbName = hms_database_name;
-  hms_table.tableName = hms_table_name;
-  ASSERT_OK(hms_client_->CreateTable(hms_table));
-
-  table_alterer.reset(client_->NewTableAlterer(table_name));
-  s = table_alterer->DropColumn("int32_val")->Alter();
-  EXPECT_TRUE(s.IsNotFound()) << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "belongs to another table");
+  NO_FATALS(CheckTable("default", "a"));
 }
 
 TEST_F(MasterHmsTest, TestDeleteTable) {
-  const char* hms_database_name = "delete_db";
-  const char* hms_table_name = "table";
-  string table_name = Substitute("$0.$1", hms_database_name, hms_table_name);
-
-  ASSERT_OK(CreateDatabase(hms_database_name));
-
-  // Create the Kudu table, then drop it and ensure the HMS entry is removed.
-  ASSERT_OK(CreateKuduTable(hms_database_name, hms_table_name));
-  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+  // Create a Kudu table, then drop it from Kudu and ensure the HMS entry is removed.
+  ASSERT_OK(CreateKuduTable("default", "a"));
+  NO_FATALS(CheckTable("default", "a"));
   hive::Table hms_table;
-  ASSERT_OK(hms_client_->GetTable(hms_database_name, hms_table_name, &hms_table));
-  ASSERT_OK(client_->DeleteTable(table_name));
-  NO_FATALS(CheckTableDoesNotExist(hms_database_name, hms_table_name));
+  ASSERT_OK(hms_client_->GetTable("default", "a", &hms_table));
 
-  // Create the Kudu table, remove the HMS entry, and ensure the Kudu table can
-  // not be dropped.
-  ASSERT_OK(CreateKuduTable(hms_database_name, hms_table_name));
-  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
-  ASSERT_OK(hms_client_->GetTable(hms_database_name, hms_table_name, &hms_table));
-  shared_ptr<KuduTable> table;
-  ASSERT_OK(client_->OpenTable(table_name, &table));
-  ASSERT_OK(hms_client_->DropTable(hms_database_name, hms_table_name));
-  Status s = hms_client_->GetTable(hms_database_name, hms_table_name, &hms_table);
-  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
-  s = client_->DeleteTable(table_name);
-  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_OK(client_->DeleteTable("default.a"));
+  NO_FATALS(CheckTableDoesNotExist("default", "a"));
 
-  // Re-create the HMS catalog entry and try again.
-  ASSERT_OK(hms_client_->CreateTable(hms_table, MasterEnvCtx()));
-  ASSERT_OK(client_->DeleteTable(table_name));
+  // Create the Kudu table, then drop it from the HMS, and ensure the Kudu table is deleted.
+  ASSERT_OK(CreateKuduTable("default", "b"));
+  NO_FATALS(CheckTable("default", "b"));
+  hive::Table hms_table_b;
+  ASSERT_OK(hms_client_->GetTable("default", "b", &hms_table_b));
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable("default.b", &table));
+  ASSERT_OK(hms_client_->DropTable("default", "b"));
+  ASSERT_EVENTUALLY([&] {
+      NO_FATALS(CheckTableDoesNotExist("default", "b"));
+  });
 
   // Ensure that dropping a table while the HMS is unreachable fails.
-  ASSERT_OK(CreateKuduTable(hms_database_name, hms_table_name));
-  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+  ASSERT_OK(CreateKuduTable("default", "c"));
+  NO_FATALS(CheckTable("default", "c"));
   ASSERT_OK(StopHms());
-  s = client_->DeleteTable(table_name);
+  Status s = client_->DeleteTable("default.c");
   ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
   ASSERT_OK(StartHms());
-  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+  NO_FATALS(CheckTable("default", "c"));
   ASSERT_EVENTUALLY([&] {
     // HmsCatalog throttles reconnections, so it's necessary to wait out the backoff.
-    ASSERT_OK(client_->DeleteTable(table_name));
+    ASSERT_OK(client_->DeleteTable("default.c"));
+  });
+  NO_FATALS(CheckTableDoesNotExist("default", "c"));
+}
+
+TEST_F(MasterHmsTest, TestNotificationLogListener) {
+  // Create a Kudu table.
+  ASSERT_OK(CreateKuduTable("default", "a"));
+  NO_FATALS(CheckTable("default", "a"));
+
+  // Rename the table in the HMS, and ensure that the notification log listener
+  // detects the rename and updates the Kudu catalog accordingly.
+  ASSERT_OK(RenameHmsTable("default", "a", "b"));
+  ASSERT_EVENTUALLY([&] {
+    NO_FATALS({
+      CheckTable("default", "b");
+      CheckTableDoesNotExist("default", "a");
+    });
   });
-  NO_FATALS(CheckTableDoesNotExist(hms_database_name, hms_table_name));
+
+  // Drop the table in the HMS, and ensure that the notification log listener
+  // detects the drop and updates the Kudu catalog accordingly.
+  ASSERT_OK(hms_client_->DropTable("default", "b"));
+  ASSERT_EVENTUALLY([&] {
+    NO_FATALS(CheckTableDoesNotExist("default", "b"));
+  });
+
+  // Rename a table from A to B to A, and ensure that Kudu doesn't continue
+  // applying notification log events in a self-perpetuating loop.
+  ASSERT_OK(CreateKuduTable("default", "a"));
+  unique_ptr<KuduTableAlterer> table_alterer;
+  table_alterer.reset(client_->NewTableAlterer("default.a")->RenameTo("default.b"));
+  ASSERT_OK(table_alterer->Alter());
+  NO_FATALS(CheckTable("default", "b"));
+  table_alterer.reset(client_->NewTableAlterer("default.b")->RenameTo("default.a"));
+  ASSERT_OK(table_alterer->Alter());
+  NO_FATALS(CheckTable("default", "a"));
+
+
+  // Ensure that Kudu can rename a table just after it's been renamed through the HMS.
+  RenameHmsTable("default", "a", "b");
+  table_alterer.reset(client_->NewTableAlterer("default.b")->RenameTo("default.c"));
+  ASSERT_OK(table_alterer->Alter());
+
+  // Ensure that Kudu can drop a table just after it's been renamed through the HMS.
+  RenameHmsTable("default", "c", "a");
+  ASSERT_OK(client_->DeleteTable("default.a"));
+
+  // Test concurrent drops from the HMS and Kudu.
+
+  // Scenario 1: drop from the HMS first.
+  ASSERT_OK(CreateKuduTable("default", "a"));
+  ASSERT_OK(hms_client_->DropTable("default", "a"));
+  Status s = client_->DeleteTable("default.a");
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  CheckTableDoesNotExist("default", "a");
+
+  // Scenario 2: drop from Kudu first.
+  ASSERT_OK(CreateKuduTable("default", "a"));
+  ASSERT_OK(client_->DeleteTable("default.a"));
+  s = hms_client_->DropTable("default", "a");
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  CheckTableDoesNotExist("default", "a");
 }
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 07c7565..84a7d90 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -34,6 +34,7 @@ ADD_EXPORTABLE_LIBRARY(master_proto
 
 set(MASTER_SRCS
   catalog_manager.cc
+  hms_notification_log_listener.cc
   master.cc
   master_cert_authority.cc
   master_options.cc
@@ -72,6 +73,7 @@ set(KUDU_TEST_LINK_LIBS
   mini_hms)
 
 ADD_KUDU_TEST(catalog_manager-test)
+ADD_KUDU_TEST(hms_notification_log_listener-test)
 ADD_KUDU_TEST(master-test RESOURCE_LOCK "master-web-port")
 ADD_KUDU_TEST(mini_master-test RESOURCE_LOCK "master-web-port")
 ADD_KUDU_TEST(sys_catalog-test RESOURCE_LOCK "master-web-port")

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 55cf535..9e8d692 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -93,6 +93,7 @@
 #include "kudu/gutil/utf/utf.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/hms/hms_catalog.h"
+#include "kudu/master/hms_notification_log_listener.h"
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master_cert_authority.h"
@@ -249,16 +250,17 @@ DECLARE_int64(tsk_rotation_seconds);
 
 using base::subtle::NoBarrier_CompareAndSwap;
 using base::subtle::NoBarrier_Load;
+using boost::optional;
 using kudu::cfile::TypeEncodingInfo;
 using kudu::consensus::ConsensusServiceProxy;
 using kudu::consensus::ConsensusStatePB;
 using kudu::consensus::GetConsensusRole;
 using kudu::consensus::IsRaftConfigMember;
+using kudu::consensus::MajorityHealthPolicy;
 using kudu::consensus::RaftConfigPB;
 using kudu::consensus::RaftConsensus;
 using kudu::consensus::RaftPeerPB;
 using kudu::consensus::StartTabletCopyRequestPB;
-using kudu::consensus::MajorityHealthPolicy;
 using kudu::consensus::kMinimumTerm;
 using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
@@ -446,7 +448,7 @@ class CatalogManagerBgTasks {
 
   ~CatalogManagerBgTasks() {}
 
-  Status Init();
+  Status Init() WARN_UNUSED_RESULT;
   void Shutdown();
 
   void Wake() {
@@ -467,7 +469,6 @@ class CatalogManagerBgTasks {
  private:
   void Run();
 
- private:
   Atomic32 closing_;
   bool pending_updates_;
   mutable Mutex lock_;
@@ -579,7 +580,6 @@ void CatalogManagerBgTasks::Run() {
 
 namespace {
 
-
 string RequestorString(RpcContext* rpc) {
   if (rpc) {
     return rpc->requestor_string();
@@ -663,6 +663,7 @@ CatalogManager::CatalogManager(Master* master)
     rng_(GetRandomSeed32()),
     state_(kConstructed),
     leader_ready_term_(-1),
+    hms_notification_log_event_id_(-1),
     leader_lock_(RWMutex::Priority::PREFER_WRITING) {
   CHECK_OK(ThreadPoolBuilder("leader-initialization")
            // Presently, this thread pool must contain only a single thread
@@ -708,6 +709,10 @@ Status CatalogManager::Init(bool is_first_run) {
     hms_catalog_.reset(new hms::HmsCatalog(std::move(master_addresses)));
     RETURN_NOT_OK_PREPEND(hms_catalog_->Start(),
                           "failed to start Hive Metastore catalog");
+
+    hms_notification_log_listener_.reset(new HmsNotificationLogListenerTask(this));
+    RETURN_NOT_OK_PREPEND(hms_notification_log_listener_->Init(),
+        "failed to initialize Hive Metastore notification log listener task");
   }
 
   std::lock_guard<LockType> l(lock_);
@@ -852,7 +857,7 @@ Status CatalogManager::InitCertAuthorityWith(
   RETURN_NOT_OK_PREPEND(tls->AddTrustedCertificate(ca->ca_cert()),
                         "could not trust master CA cert");
   // If we haven't signed our own server cert yet, do so.
-  boost::optional<security::CertSignRequest> csr = tls->GetCsrIfNecessary();
+  optional<security::CertSignRequest> csr = tls->GetCsrIfNecessary();
   if (csr) {
     Cert cert;
     RETURN_NOT_OK_PREPEND(ca->SignServerCSR(*csr, &cert),
@@ -1025,6 +1030,18 @@ void CatalogManager::PrepareForLeadershipTask() {
         return;
       }
     }
+
+    if (hms_catalog_) {
+      static const char* const kNotificationLogEventIdDescription =
+          "Loading latest processed Hive Metastore notification log event ID";
+      LOG(INFO) << kNotificationLogEventIdDescription << "...";
+      LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() + kNotificationLogEventIdDescription) {
+        if (!check(std::bind(&CatalogManager::InitLatestNotificationLogEventId, this),
+                   *consensus, term, kNotificationLogEventIdDescription).ok()) {
+          return;
+        }
+      }
+    }
   }
 
   std::lock_guard<simple_spinlock> l(state_lock_);
@@ -1175,6 +1192,7 @@ void CatalogManager::Shutdown() {
   }
 
   if (hms_catalog_) {
+    hms_notification_log_listener_->Shutdown();
     hms_catalog_->Stop();
   }
 
@@ -1262,7 +1280,7 @@ Status ValidateIdentifier(const string& id) {
 }
 
 // Validate the client-provided schema and name.
-Status ValidateClientSchema(const boost::optional<string>& name,
+Status ValidateClientSchema(const optional<string>& name,
                             const Schema& schema) {
   if (name != boost::none) {
     RETURN_NOT_OK_PREPEND(ValidateIdentifier(name.get()), "invalid table name");
@@ -1310,6 +1328,12 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
+  // If the HMS integration is enabled, wait for the notification log listener
+  // to catch up. This reduces the likelihood of attempting to create a table
+  // with a name that conflicts with a table that has just been deleted or
+  // renamed in the HMS.
+  RETURN_NOT_OK(WaitForNotificationLogListenerCatchUp(resp, rpc));
+
   // Copy the request, so we can fill in some defaults.
   CreateTableRequestPB req = *orig_req;
   LOG(INFO) << Substitute("Servicing CreateTable request from $0:\n$1",
@@ -1706,10 +1730,71 @@ Status CatalogManager::DeleteTableRpc(const DeleteTableRequestPB& req,
                                       rpc::RpcContext* rpc) {
   LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1",
                           RequestorString(rpc), SecureShortDebugString(req));
-  return DeleteTable(req, resp);
+
+  leader_lock_.AssertAcquiredForReading();
+  RETURN_NOT_OK(CheckOnline());
+
+  // If the HMS integration is enabled, then don't directly remove the table
+  // from the Kudu catalog. Instead, delete the table from the HMS and wait for
+  // the notification log listener to apply the corresponding event to the
+  // catalog. By 'serializing' the drop through the HMS, race conditions are
+  // avoided.
+  if (hms_catalog_) {
+    // Wait for the notification log listener to catch up. This reduces the
+    // likelihood of attempting to delete a table which has just been deleted or
+    // renamed in the HMS.
+    RETURN_NOT_OK(WaitForNotificationLogListenerCatchUp(resp, rpc));
+
+    // Look up and lock the table.
+    scoped_refptr<TableInfo> table;
+    TableMetadataLock l;
+    RETURN_NOT_OK(FindAndLockTable(req, resp, LockMode::READ, &table, &l));
+    if (l.data().is_deleted()) {
+      return SetupError(Status::NotFound("the table was deleted", l.data().pb.state_msg()),
+          resp, MasterErrorPB::TABLE_NOT_FOUND);
+    }
+
+    // Drop the table from the HMS.
+    RETURN_NOT_OK(SetupError(
+          hms_catalog_->DropTable(table->id(), l.data().name()),
+          resp, MasterErrorPB::HIVE_METASTORE_ERROR));
+
+    // Unlock the table, and wait for the notification log listener to handle
+    // the delete table event.
+    l.Unlock();
+    return WaitForNotificationLogListenerCatchUp(resp, rpc);
+  }
+
+  // If the HMS integration isn't enabled, then delete the table directly from
+  // the Kudu catalog.
+  return DeleteTable(req, resp, boost::none);
 }
 
-Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req, DeleteTableResponsePB* resp) {
+Status CatalogManager::DeleteTableHms(const string& table_name,
+                                      const string& table_id,
+                                      int64_t notification_log_event_id) {
+  LOG(INFO) << "Deleting table " << table_name
+            << " [id=" << table_id
+            << "] in response to Hive Metastore notification log event "
+            << notification_log_event_id;
+
+  DeleteTableRequestPB req;
+  DeleteTableResponsePB resp;
+  req.mutable_table()->set_table_name(table_name);
+  req.mutable_table()->set_table_id(table_id);
+
+  RETURN_NOT_OK(DeleteTable(req, &resp, notification_log_event_id));
+
+  // Update the cached HMS notification log event ID, if it changed.
+  DCHECK_GT(notification_log_event_id, hms_notification_log_event_id_);
+  hms_notification_log_event_id_ = notification_log_event_id;
+
+  return Status::OK();
+}
+
+Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req,
+                                   DeleteTableResponsePB* resp,
+                                   optional<int64_t> hms_notification_log_event_id) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
@@ -1722,42 +1807,11 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req, DeleteTableR
         resp, MasterErrorPB::TABLE_NOT_FOUND);
   }
 
-  // 2. Drop the HMS table entry.
-  //
-  // This step comes before modifying the sys catalog so that we can ensure the
-  // HMS is available. We do not allow dropping tables while the HMS is
-  // unavailable, because that would cause the catalogs to become unsynchronized.
-  if (hms_catalog_) {
-    Status s = hms_catalog_->DropTable(table->id(), l.data().name());
-    if (!s.ok()) {
-      s.CloneAndPrepend(Substitute("an error occurred while dropping table $0 in the HMS",
-                                   l.data().name()));
-      LOG(WARNING) << s.ToString();
-      return SetupError(std::move(s), resp, MasterErrorPB::HIVE_METASTORE_ERROR);
-    }
-  }
-  // Re-create the HMS entry if we exit early.
-  auto abort_hms = MakeScopedCleanup([&] {
-      // TODO(dan): figure out how to test this.
-      if (hms_catalog_) {
-        TRACE("Rolling back HMS table deletion");
-        Schema schema;
-        Status s = SchemaFromPB(l.data().pb.schema(), &schema);
-        if (!s.ok()) {
-          LOG(WARNING) << "Failed to decode schema during HMS table entry deletion roll-back: "
-                       << s.ToString();
-          return;
-        }
-        WARN_NOT_OK(hms_catalog_->CreateTable(table->id(), l.data().name(), schema),
-                    "An error occurred while attempting to roll-back HMS table entry deletion");
-      }
-  });
-
   TRACE("Modifying in-memory table state")
   string deletion_msg = "Table deleted at " + LocalTimeAsString();
   l.mutable_data()->set_state(SysTablesEntryPB::REMOVED, deletion_msg);
 
-  // 3. Look up the tablets, lock them, and mark them as deleted.
+  // 2. Look up the tablets, lock them, and mark them as deleted.
   {
     TRACE("Locking tablets");
     vector<scoped_refptr<TabletInfo>> tablets;
@@ -1771,9 +1825,10 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req, DeleteTableR
           SysTabletsEntryPB::DELETED, deletion_msg);
     }
 
-    // 4. Update sys-catalog with the removed table and tablet state.
+    // 3. Update sys-catalog with the removed table and tablet state.
     TRACE("Removing table and tablets from system table");
     SysCatalogTable::Actions actions;
+    actions.hms_notification_log_event_id = hms_notification_log_event_id;
     actions.table_to_update = table;
     actions.tablets_to_update.assign(tablets.begin(), tablets.end());
     Status s = sys_catalog_->Write(actions);
@@ -1784,10 +1839,7 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req, DeleteTableR
       return s;
     }
 
-    // The operation has been written to sys-catalog; now it must succeed.
-    abort_hms.cancel();
-
-    // 5. Remove the table from the by-name map.
+    // 4. Remove the table from the by-name map.
     {
       TRACE("Removing table from by-name map");
       std::lock_guard<LockType> l_map(lock_);
@@ -1798,19 +1850,19 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req, DeleteTableR
       }
     }
 
-    // 6. Commit the dirty tablet state.
+    // 5. Commit the dirty tablet state.
     lock.Commit();
   }
 
-  // 7. Commit the dirty table state.
+  // 6. Commit the dirty table state.
   TRACE("Committing in-memory state");
   l.Commit();
 
-  // 8. Abort any extant tasks belonging to the table.
+  // 7. Abort any extant tasks belonging to the table.
   TRACE("Aborting table tasks");
   table->AbortTasks();
 
-  // 9. Send a DeleteTablet() request to each tablet replica in the table.
+  // 8. Send a DeleteTablet() request to each tablet replica in the table.
   SendDeleteTableRequest(table, deletion_msg);
 
   VLOG(1) << "Deleted table " << table->ToString();
@@ -2073,12 +2125,79 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
 Status CatalogManager::AlterTableRpc(const AlterTableRequestPB& req,
                                      AlterTableResponsePB* resp,
                                      rpc::RpcContext* rpc) {
+  leader_lock_.AssertAcquiredForReading();
+  RETURN_NOT_OK(CheckOnline());
+
+  // If the HMS integration is enabled, wait for the notification log listener
+  // to catch up. This reduces the likelihood of attempting to apply an
+  // alteration to a table which has just been renamed or deleted through the HMS.
+  RETURN_NOT_OK(WaitForNotificationLogListenerCatchUp(resp, rpc));
+
   LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
                           RequestorString(rpc), SecureShortDebugString(req));
-  return AlterTable(req, resp);
+
+  // If the HMS integration is enabled and the alteration includes a table
+  // rename, then don't directly rename the table in the Kudu catalog. Instead,
+  // rename the table in the HMS and wait for the notification log listener to
+  // apply that event to the catalog. By 'serializing' the rename through the
+  // HMS, race conditions are avoided.
+  if (hms_catalog_ && req.has_new_table_name()) {
+    // Look up the table, lock it, and mark it as removed.
+    scoped_refptr<TableInfo> table;
+    TableMetadataLock l;
+    RETURN_NOT_OK(FindAndLockTable(req, resp, LockMode::READ, &table, &l));
+    RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
+
+    Schema schema;
+    RETURN_NOT_OK(SchemaFromPB(l.data().pb.schema(), &schema));
+
+    // Rename the table in the HMS.
+    RETURN_NOT_OK(SetupError(hms_catalog_->AlterTable(
+            table->id(), l.data().name(), req.new_table_name(),
+            schema),
+        resp, MasterErrorPB::HIVE_METASTORE_ERROR));
+
+    // Unlock the table, and wait for the notification log listener to handle
+    // the alter table event.
+    l.Unlock();
+    RETURN_NOT_OK(WaitForNotificationLogListenerCatchUp(resp, rpc));
+
+    // Finally, apply the remaining schema and partitioning alterations to the
+    // local catalog. Since Kudu holds the canonical version of table schemas
+    // and partitions the HMS is not updated first.
+    AlterTableRequestPB r(req);
+    r.mutable_table()->clear_table_name();
+    r.mutable_table()->set_table_id(table->id());
+    r.clear_new_table_name();
+
+    return AlterTable(r, resp, boost::none);
+  }
+
+  return AlterTable(req, resp, boost::none);
+}
+
+Status CatalogManager::RenameTableHms(const string& table_id,
+                                      const string& table_name,
+                                      const string& new_table_name,
+                                      int64_t notification_log_event_id) {
+  AlterTableRequestPB req;
+  AlterTableResponsePB resp;
+  req.mutable_table()->set_table_id(table_id);
+  req.mutable_table()->set_table_name(table_name);
+  req.set_new_table_name(new_table_name);
+
+  RETURN_NOT_OK(AlterTable(req, &resp, notification_log_event_id));
+
+  // Update the cached HMS notification log event ID.
+  DCHECK_GT(notification_log_event_id, hms_notification_log_event_id_);
+  hms_notification_log_event_id_ = notification_log_event_id;
+
+  return Status::OK();
 }
 
-Status CatalogManager::AlterTable(const AlterTableRequestPB& req, AlterTableResponsePB* resp) {
+Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
+                                  AlterTableResponsePB* resp,
+                                  optional<int64_t> hms_notification_log_event_id) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
@@ -2230,45 +2349,11 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req, AlterTableResp
                                            LocalTimeAsString()));
   }
 
-  // 7. Update the HMS table entry.
-  //
-  // This step comes before modifying the sys catalog so that we can ensure the
-  // HMS is available. We do not allow altering tables while the HMS is
-  // unavailable, because that would cause the catalogs to become unsynchronized.
-  if (hms_catalog_ != nullptr && has_metadata_changes) {
-    const string& new_name = req.has_new_table_name() ? req.new_table_name() : table_name;
-    Status s = hms_catalog_->AlterTable(table->id(), table_name, new_name, new_schema);
-
-    if (!s.ok()) {
-      s = s.CloneAndPrepend(Substitute("an error occurred while altering table $0 in the HMS",
-                                       table_name));
-      LOG(WARNING) << s.ToString();
-      return SetupError(std::move(s), resp, MasterErrorPB::HIVE_METASTORE_ERROR);
-    }
-  }
-  // Roll-back the HMS alteration if we exit early.
-  auto abort_hms = MakeScopedCleanup([&] {
-      // TODO(dan): figure out how to test this.
-      if (hms_catalog_) {
-        TRACE("Rolling back HMS table alterations");
-        Schema schema;
-        Status s = SchemaFromPB(l.data().pb.schema(), &schema);
-        if (!s.ok()) {
-          LOG(WARNING) << "Failed to decode schema during HMS table entry alteration roll-back: "
-                       << s.ToString();
-          return;
-        }
-        const string& new_name = req.has_new_table_name() ? req.new_table_name() : table_name;
-
-        WARN_NOT_OK(hms_catalog_->AlterTable(table->id(), new_name, table_name, schema),
-                    "An error occurred while attempting to roll-back HMS table entry alteration");
-      }
-  });
-
-  // 8. Update sys-catalog with the new table schema and tablets to add/drop.
+  // 7. Update sys-catalog with the new table schema and tablets to add/drop.
   TRACE("Updating metadata on disk");
   string deletion_msg = "Partition dropped at " + LocalTimeAsString();
   SysCatalogTable::Actions actions;
+  actions.hms_notification_log_event_id = hms_notification_log_event_id;
   if (!tablets_to_add.empty() || has_metadata_changes) {
     // If anything modified the table's persistent metadata, then sync it to the sys catalog.
     actions.table_to_update = table;
@@ -2294,12 +2379,10 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req, AlterTableResp
     return s;
   }
 
-  // 9. Commit the in-memory state.
+  // 8. Commit the in-memory state.
   {
     TRACE("Committing alterations to in-memory state");
 
-    abort_hms.cancel();
-
     // Commit new tablet in-memory state. This doesn't require taking the global
     // lock since the new tablets are not yet visible, because they haven't been
     // added to the table or tablet index.
@@ -2350,6 +2433,21 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req, AlterTableResp
   // the tablet again.
   tablets_to_drop_lock.Commit();
 
+  // If there are schema changes, then update the entry in the Hive Metastore.
+  // This is done on a best-effort basis, since Kudu is the source of truth for
+  // table schema information, and the table has already been altered in the
+  // Kudu catalog via the successful sys-table write above.
+  if (hms_catalog_ && has_schema_changes) {
+    // Sanity check: if there are schema changes then this is necessarily not a
+    // table rename, since we split out the rename portion into its own
+    // 'transaction' which is serialized through the HMS.
+    DCHECK(!req.has_new_table_name());
+    WARN_NOT_OK(hms_catalog_->AlterTable(table->id(), table_name, table_name, new_schema),
+                Substitute(
+                  "failed to alter HiveMetastore schema for table $0, "
+                  "HMS schema information will be stale", table->ToString()));
+  }
+
   if (!tablets_to_add.empty() || has_metadata_changes) {
     l.Commit();
   } else {
@@ -2943,7 +3041,7 @@ class AsyncDeleteReplica : public RetrySpecificTSRpcTask {
       Master* master, const string& permanent_uuid,
       const scoped_refptr<TableInfo>& table, string tablet_id,
       TabletDataState delete_type,
-      boost::optional<int64_t> cas_config_opid_index_less_or_equal,
+      optional<int64_t> cas_config_opid_index_less_or_equal,
       string reason)
       : RetrySpecificTSRpcTask(master, permanent_uuid, table),
         tablet_id_(std::move(tablet_id)),
@@ -3021,7 +3119,7 @@ class AsyncDeleteReplica : public RetrySpecificTSRpcTask {
 
   const string tablet_id_;
   const TabletDataState delete_type_;
-  const boost::optional<int64_t> cas_config_opid_index_less_or_equal_;
+  const optional<int64_t> cas_config_opid_index_less_or_equal_;
   const string reason_;
   tserver::DeleteTabletResponsePB resp_;
 };
@@ -3850,6 +3948,34 @@ Status CatalogManager::ProcessTabletReport(
   return Status::OK();
 }
 
+int64_t CatalogManager::GetLatestNotificationLogEventId() {
+  DCHECK(hms_catalog_);
+  leader_lock_.AssertAcquiredForReading();
+  return hms_notification_log_event_id_;
+}
+
+Status CatalogManager::InitLatestNotificationLogEventId() {
+  DCHECK(hms_catalog_);
+  leader_lock_.AssertAcquiredForWriting();
+  int64_t hms_notification_log_event_id;
+  RETURN_NOT_OK(sys_catalog_->GetLatestNotificationLogEventId(&hms_notification_log_event_id));
+  hms_notification_log_event_id_ = hms_notification_log_event_id;
+  return Status::OK();
+}
+
+Status CatalogManager::StoreLatestNotificationLogEventId(int64_t event_id) {
+  DCHECK(hms_catalog_);
+  DCHECK_GT(event_id, hms_notification_log_event_id_);
+  leader_lock_.AssertAcquiredForReading();
+  SysCatalogTable::Actions actions;
+  actions.hms_notification_log_event_id = event_id;
+  RETURN_NOT_OK_PREPEND(
+      sys_catalog()->Write(actions),
+      "Failed to update processed Hive Metastore notification log ID in the sys catalog table");
+  hms_notification_log_event_id_ = event_id;
+  return Status::OK();
+}
+
 std::shared_ptr<RaftConsensus> CatalogManager::master_consensus() const {
   // CatalogManager::InitSysCatalogAsync takes lock_ in exclusive mode in order
   // to initialize sys_catalog_, so it's sufficient to take lock_ in shared mode
@@ -4388,7 +4514,7 @@ Status CatalogManager::GetTabletLocations(const string& tablet_id,
   return BuildLocationsForTablet(tablet_info, filter, locs_pb);
 }
 
-Status CatalogManager::ReplaceTablet(const std::string& tablet_id, ReplaceTabletResponsePB* resp) {
+Status CatalogManager::ReplaceTablet(const string& tablet_id, ReplaceTabletResponsePB* resp) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
@@ -4605,6 +4731,31 @@ void CatalogManager::AbortAndWaitForAllTasks(
     t->WaitTasksCompletion();
   }
 }
+
+template<typename RespClass>
+Status CatalogManager::WaitForNotificationLogListenerCatchUp(RespClass* resp,
+                                                             rpc::RpcContext* rpc) {
+  if (hms_catalog_) {
+    Status s = hms_notification_log_listener_->WaitForCatchUp(rpc->GetClientDeadline());
+    // ServiceUnavailable indicates the master has lost leadership.
+    MasterErrorPB::Code code = s.IsServiceUnavailable() ?
+      MasterErrorPB::NOT_THE_LEADER :
+      MasterErrorPB::HIVE_METASTORE_ERROR;
+    return SetupError(s, resp, code);
+  }
+  return Status::OK();
+}
+
+const char* CatalogManager::StateToString(State state) {
+  switch (state) {
+    case CatalogManager::kConstructed: return "Constructed";
+    case CatalogManager::kStarting: return "Starting";
+    case CatalogManager::kRunning: return "Running";
+    case CatalogManager::kClosing: return "Closing";
+  }
+  __builtin_unreachable();
+}
+
 ////////////////////////////////////////////////////////////
 // CatalogManager::ScopedLeaderSharedLock
 ////////////////////////////////////////////////////////////
@@ -4622,7 +4773,7 @@ CatalogManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock(
   if (PREDICT_FALSE(catalog_->state_ != kRunning)) {
     catalog_status_ = Status::ServiceUnavailable(
         Substitute("Catalog manager is not initialized. State: $0",
-                   catalog_->state_));
+                   StateToString(catalog_->state_)));
     return;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 447a391..d27975a 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -17,6 +17,7 @@
 #ifndef KUDU_MASTER_CATALOG_MANAGER_H
 #define KUDU_MASTER_CATALOG_MANAGER_H
 
+#include <atomic>
 #include <cstdint>
 #include <functional>
 #include <iosfwd>
@@ -30,6 +31,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 #include <gtest/gtest_prod.h>
 
@@ -58,7 +60,7 @@ class NodeInstancePB;
 class PartitionPB;
 class PartitionSchema;
 class Schema;
-class ThreadPool; // IWYU pragma: keep
+class ThreadPool;
 struct ColumnId;
 
 // Working around FRIEND_TEST() ugliness.
@@ -73,7 +75,7 @@ class RpcContext;
 namespace security {
 class Cert;
 class PrivateKey;
-class TokenSigningPublicKeyPB; // IWYU pragma: keep
+class TokenSigningPublicKeyPB;
 } // namespace security
 
 namespace consensus {
@@ -89,9 +91,14 @@ namespace hms {
 class HmsCatalog;
 } // namespace hms
 
+namespace hms {
+class HmsCatalog;
+}
+
 namespace master {
 
-class CatalogManagerBgTasks; // IWYU pragma: keep
+class CatalogManagerBgTasks;
+class HmsNotificationLogListenerTask;
 class Master;
 class SysCatalogTable;
 class TSDescriptor;
@@ -539,7 +546,13 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // but this function does not itself respond to the RPC.
   Status DeleteTableRpc(const DeleteTableRequestPB& req,
                         DeleteTableResponsePB* resp,
-                        rpc::RpcContext* rpc);
+                        rpc::RpcContext* rpc) WARN_UNUSED_RESULT;
+
+  // Delete the specified table in response to a 'DROP TABLE' HMS notification
+  // log listener event.
+  Status DeleteTableHms(const std::string& table_name,
+                        const std::string& table_id,
+                        int64_t notification_log_event_id) WARN_UNUSED_RESULT;
 
   // Alter the specified table in response to an AlterTableRequest RPC.
   //
@@ -549,6 +562,13 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
                        AlterTableResponsePB* resp,
                        rpc::RpcContext* rpc);
 
+  // Rename the specified table in response to an 'ALTER TABLE RENAME' HMS
+  // notification log listener event.
+  Status RenameTableHms(const std::string& table_id,
+                        const std::string& table_name,
+                        const std::string& new_table_name,
+                        int64_t notification_log_event_id) WARN_UNUSED_RESULT;
+
   // Get the information about an in-progress alter operation
   //
   // The RPC context is provided for logging/tracing purposes,
@@ -593,6 +613,18 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
                              TabletReportUpdatesPB* full_report_update,
                              rpc::RpcContext* rpc);
 
+  // Returns the latest handled Hive Metastore notification log event ID.
+  int64_t GetLatestNotificationLogEventId();
+
+  // Initializes the cached latest handled Hive Metastore notification log event ID
+  // after a leader election.
+  Status InitLatestNotificationLogEventId();
+
+  // Stores the latest handled Hive Metastore notification log event ID.
+  //
+  // Must only be called by the singleton notification log listener thread.
+  Status StoreLatestNotificationLogEventId(int64_t event_id);
+
   SysCatalogTable* sys_catalog() { return sys_catalog_.get(); }
 
   // Returns the Master tablet's RaftConsensus instance if it is initialized, or
@@ -647,6 +679,10 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // must be initialized before calling this method.
   consensus::RaftPeerPB::Role Role() const;
 
+  hms::HmsCatalog* HmsCatalog() const {
+    return hms_catalog_.get();
+  }
+
  private:
   // These tests call ElectedAsLeaderCb() directly.
   FRIEND_TEST(MasterTest, TestShutdownDuringTableVisit);
@@ -665,13 +701,20 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   typedef std::unordered_map<std::string, scoped_refptr<TabletInfo>> TabletInfoMap;
 
   // Delete the specified table in the catalog.
+  //
+  // If a notification log event ID is provided, it will be written to the sys
+  // catalog.
   Status DeleteTable(const DeleteTableRequestPB& req,
-                     DeleteTableResponsePB* resp) WARN_UNUSED_RESULT;
-
+                     DeleteTableResponsePB* resp,
+                     boost::optional<int64_t> hms_notification_log_event_id) WARN_UNUSED_RESULT;
 
   // Alter the specified table in the catalog.
+  //
+  // If a notification log event ID is provided, it will be written to the sys
+  // catalog along with the altered table metadata.
   Status AlterTable(const AlterTableRequestPB& req,
-                    AlterTableResponsePB* resp) WARN_UNUSED_RESULT;
+                    AlterTableResponsePB* resp,
+                    boost::optional<int64_t> hms_notification_log_event_id) WARN_UNUSED_RESULT;
 
   // Called by SysCatalog::SysCatalogStateChanged when this node
   // becomes the leader of a consensus configuration. Executes
@@ -895,6 +938,13 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // Aborts all tasks belonging to 'tables' and waits for them to finish.
   void AbortAndWaitForAllTasks(const std::vector<scoped_refptr<TableInfo>>& tables);
 
+  // Wait for the Hive Metastore notification log listener to process the latest
+  // events, if the HMS integration is enabled. Handles setting the correct
+  // response code in the case of an error.
+  template<typename RespClass>
+  Status WaitForNotificationLogListenerCatchUp(RespClass* resp,
+                                               rpc::RpcContext* rpc) WARN_UNUSED_RESULT;
+
   // TODO: the maps are a little wasteful of RAM, since the TableInfo/TabletInfo
   // objects have a copy of the string key. But STL doesn't make it
   // easy to make a "gettable set".
@@ -938,6 +988,7 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   gscoped_ptr<CatalogManagerBgTasks> background_tasks_;
 
   std::unique_ptr<hms::HmsCatalog> hms_catalog_;
+  std::unique_ptr<HmsNotificationLogListenerTask> hms_notification_log_listener_;
 
   enum State {
     kConstructed,
@@ -946,6 +997,8 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
     kClosing
   };
 
+  static const char* StateToString(State state);
+
   // Lock protecting state_, leader_ready_term_
   mutable simple_spinlock state_lock_;
   State state_;
@@ -961,6 +1014,13 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // correctly.
   int64_t leader_ready_term_;
 
+  // This field is updated when a node becomes leader master, and the HMS
+  // integration is enabled. It caches the latest processed Hive Metastore
+  // notification log event ID so that every request does not need to hit the
+  // sys catalog. Must only be written to by the HMS notification log listener
+  // thread.
+  std::atomic<int64_t> hms_notification_log_event_id_;
+
   // Lock used to fence operations and leader elections. All logical operations
   // (i.e. create table, alter table, etc.) should acquire this lock for
   // reading. Following an election where this master is elected leader, it

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/hms_notification_log_listener-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/hms_notification_log_listener-test.cc b/src/kudu/master/hms_notification_log_listener-test.cc
new file mode 100644
index 0000000..415c4c6
--- /dev/null
+++ b/src/kudu/master/hms_notification_log_listener-test.cc
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/hms_notification_log_listener.h"
+
+#include <cstdint>
+#include <thread>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_uint32(hive_metastore_notification_log_poll_period_seconds);
+DECLARE_uint32(hive_metastore_notification_log_poll_inject_latency_ms);
+
+namespace kudu {
+namespace master {
+
+class HmsNotificationLogListenerTest : public KuduTest {
+ public:
+  uint32_t poll_period_ = FLAGS_hive_metastore_notification_log_poll_period_seconds;
+};
+
+// Test that an immediate shutdown will short-circuit the poll period.
+TEST_F(HmsNotificationLogListenerTest, TestImmediateShutdown) {
+  HmsNotificationLogListenerTask listener(nullptr);
+  ASSERT_OK(listener.Init());
+
+  // Wait a bit to ensure the task thread enters the poll wait.
+  SleepFor(MonoDelta::FromMilliseconds(100));
+
+  MonoTime start = MonoTime::Now();
+  listener.Shutdown();
+  ASSERT_LT(MonoTime::Now() - start, MonoDelta::FromSeconds(poll_period_ / 2));
+}
+
+// Test that WaitForCatchUp will short-circuit the poll period.
+TEST_F(HmsNotificationLogListenerTest, TestPoll) {
+  HmsNotificationLogListenerTask listener(nullptr);
+  ASSERT_OK(listener.Init());
+
+  // Wait a bit to ensure the task thread enters the poll wait.
+  SleepFor(MonoDelta::FromMilliseconds(100));
+
+  ASSERT_OK(listener.WaitForCatchUp(MonoTime::Now() + MonoDelta::FromSeconds(poll_period_ / 2)));
+}
+
+// Test that WaitForCatchUp will short-circuit the poll period, even when the
+// task is in the middle of polling when the wait initiates.
+TEST_F(HmsNotificationLogListenerTest, TestWaitWhilePolling) {
+  FLAGS_hive_metastore_notification_log_poll_inject_latency_ms = 100;
+
+  HmsNotificationLogListenerTask listener(nullptr);
+  ASSERT_OK(listener.Init());
+
+  ASSERT_OK(listener.WaitForCatchUp(MonoTime::Now() + MonoDelta::FromSeconds(poll_period_ / 2)));
+}
+
+// Test that shutting down with a waiter will result in the waiter receiving an error.
+TEST_F(HmsNotificationLogListenerTest, TestWaitAndShutdown) {
+  // Inject some latency to ensure that the wait occurs when the task is
+  // polling, otherwise it could immediately begin servicing the wait and not
+  // actually see the shutdown.
+  FLAGS_hive_metastore_notification_log_poll_inject_latency_ms = 100;
+
+  HmsNotificationLogListenerTask listener(nullptr);
+  ASSERT_OK(listener.Init());
+
+  auto waiter = std::thread([&] {
+      Status s = listener.WaitForCatchUp(MonoTime::Now() +
+                                         MonoDelta::FromSeconds(poll_period_ / 2));
+      CHECK(s.IsServiceUnavailable());
+  });
+
+  // There's a race between the waiter thread checking the closed_ flag in
+  // WaitForCatchUp and this thread setting the flag in Shutdown. This test is
+  // trying to excercise the case where the waiter is able to enqueue the
+  // callback, so to make that more likely we slow down the call to Shutdown.
+  SleepFor(MonoDelta::FromMilliseconds(10));
+
+  listener.Shutdown();
+  waiter.join();
+}
+} // namespace master
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/hms_notification_log_listener.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/hms_notification_log_listener.cc b/src/kudu/master/hms_notification_log_listener.cc
new file mode 100644
index 0000000..e067dda
--- /dev/null
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/hms_notification_log_listener.h"
+
+#include <cstdint>
+#include <map>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <rapidjson/document.h>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/hms/hive_metastore_types.h"
+#include "kudu/hms/hms_catalog.h"
+#include "kudu/hms/hms_client.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/util/async_util.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status_callback.h"
+#include "kudu/util/thread.h"
+
+DEFINE_uint32(hive_metastore_notification_log_poll_period_seconds, 15,
+              "Amount of time the notification log listener waits between attempts to poll "
+              "the Hive Metastore for catalog updates.");
+TAG_FLAG(hive_metastore_notification_log_poll_period_seconds, advanced);
+TAG_FLAG(hive_metastore_notification_log_poll_period_seconds, runtime);
+TAG_FLAG(hive_metastore_notification_log_poll_period_seconds, experimental);
+
+DEFINE_int32(hive_metastore_notification_log_batch_size, 100,
+             "Number of notification log entries which are retrieved from the Hive Metastore "
+             "per batch when polling.");
+TAG_FLAG(hive_metastore_notification_log_batch_size, advanced);
+TAG_FLAG(hive_metastore_notification_log_batch_size, runtime);
+TAG_FLAG(hive_metastore_notification_log_batch_size, experimental);
+
+DEFINE_uint32(hive_metastore_notification_log_poll_inject_latency_ms, 0,
+              "Inject latency into the inner polling loop of the Hive Metastore"
+              "notification log listener. Only takes effect during unit tests.");
+TAG_FLAG(hive_metastore_notification_log_poll_inject_latency_ms, unsafe);
+TAG_FLAG(hive_metastore_notification_log_poll_inject_latency_ms, runtime);
+
+using rapidjson::Document;
+using rapidjson::Value;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace master {
+
+// Status message returned when the task is shutdown.
+static const char* kShutdownMessage =
+  "Hive Metastore notification log listener is shutting down";
+
+HmsNotificationLogListenerTask::HmsNotificationLogListenerTask(CatalogManager* catalog_manager)
+  : catalog_manager_(catalog_manager),
+    closing_(false),
+    wake_up_cv_(&lock_) {
+}
+
+HmsNotificationLogListenerTask::~HmsNotificationLogListenerTask() {
+  if (thread_) {
+    Shutdown();
+  }
+}
+
+Status HmsNotificationLogListenerTask::Init() {
+  CHECK(!thread_) << "HmsNotificationLogListenerTask is already initialized";
+  return kudu::Thread::Create("catalog manager", "hms-notification-log-listener",
+                              &HmsNotificationLogListenerTask::RunLoop, this, &thread_);
+}
+
+void HmsNotificationLogListenerTask::Shutdown() {
+  CHECK(thread_) << "HmsNotificationLogListenerTask is not initialized";
+  {
+    std::lock_guard<Mutex> l(lock_);
+    DCHECK(!closing_);
+    closing_ = true;
+    wake_up_cv_.Signal();
+  }
+  CHECK_OK(ThreadJoiner(thread_.get()).Join());
+  thread_.reset();
+}
+
+Status HmsNotificationLogListenerTask::WaitForCatchUp(const MonoTime& deadline) {
+  Synchronizer synchronizer;
+  auto callback = synchronizer.AsStdStatusCallback();
+
+  {
+    std::lock_guard<Mutex> l(lock_);
+    if (closing_) {
+      return Status::ServiceUnavailable(kShutdownMessage);
+    }
+    catch_up_callbacks_.emplace_back(synchronizer.AsStatusCallback());
+    wake_up_cv_.Signal();
+  }
+
+  RETURN_NOT_OK_PREPEND(synchronizer.WaitFor(deadline - MonoTime::Now()),
+                        "failed to wait for Hive Metastore notification log listener to catch up");
+  return Status::OK();
+}
+
+void HmsNotificationLogListenerTask::RunLoop() {
+  vector<StatusCallback> callback_batch;
+  while (true) {
+    Status s = Poll();
+    WARN_NOT_OK(s, "Hive Metastore notification log listener poll failed");
+
+    // Wakeup all threads which enqueued before beginning the poll.
+    for (auto& cb : callback_batch) {
+      cb.Run(s);
+    }
+
+    {
+      std::lock_guard<Mutex> l(lock_);
+
+      // Check if shutdown was signaled while polling.
+      if (closing_) {
+        callback_batch = std::move(catch_up_callbacks_);
+        break;
+      }
+
+      // Check if a waiter thread enqueued while polling. If not, then wait for
+      // up to a poll period to elapse.
+      if (catch_up_callbacks_.empty()) {
+        wake_up_cv_.WaitFor(
+          MonoDelta::FromSeconds(FLAGS_hive_metastore_notification_log_poll_period_seconds));
+      }
+
+      // Swap the current queue of callbacks, so they can be completed after
+      // polling next iteration.
+      callback_batch = std::move(catch_up_callbacks_);
+
+      // Check if shutdown was signaled while waiting.
+      if (closing_) {
+        break;
+      }
+    }
+  }
+
+  for (auto& cb : callback_batch) {
+    cb.Run(Status::ServiceUnavailable(kShutdownMessage));
+  }
+}
+
+namespace {
+
+// Returns a text string appropriate for debugging a notification event.
+string EventDebugString(const hive::NotificationEvent& event) {
+  return Substitute("$0 $1 $2.$3", event.eventId, event.eventType, event.dbName, event.tableName);
+}
+
+// Parses the event message from a notification event. See
+// org.apache.hadoop.hive.metastore.messaging.MessageFactory for more info.
+//
+// Since JSONMessageFactory is currently the only concrete implementation of
+// MessageFactory, this method is specialized to return the Document type. If
+// another MessageFactory instance becomes used in the future this method should
+// be updated to handle it accordingly.
+Status ParseMessage(const hive::NotificationEvent& event, Document* message) {
+  if (event.messageFormat != "json-0.2") {
+    return Status::NotSupported("unknown message format", event.messageFormat);
+  }
+  if (message->Parse<0>(event.message.c_str()).HasParseError()) {
+    return Status::Corruption("failed to parse message", message->GetParseError());
+  }
+  return Status::OK();
+}
+
+// Deserializes an HMS table object from a JSON notification log message.
+Status DeserializeTable(const hive::NotificationEvent(event),
+                        const Document& message,
+                        const char* key,
+                        hive::Table* table) {
+  if (!message.HasMember(key)) {
+    return Status::Corruption("field is not present", key);
+  }
+  if (!message[key].IsString()) {
+    return Status::Corruption("field is not a string", key);
+  }
+
+  const Value& value = message[key];
+  Slice slice(value.GetString(), value.GetStringLength());
+  return hms::HmsClient::DeserializeJsonTable(slice, table);
+}
+} // anonymous namespace
+
+Status HmsNotificationLogListenerTask::Poll() {
+  if (!catalog_manager_) {
+    SleepFor(MonoDelta::FromMilliseconds(
+          FLAGS_hive_metastore_notification_log_poll_inject_latency_ms));
+    // Unit-test mode.
+    return Status::OK();
+  }
+
+  // This method calls the catalog manager directly, so ensure the leader lock is held.
+  CatalogManager::ScopedLeaderSharedLock l(catalog_manager_);
+  if (!l.first_failed_status().ok()) {
+    LOG(INFO) << "Skipping Hive Metastore notification log poll: "
+              << l.first_failed_status().ToString();
+    return Status::OK();
+  }
+
+  // Cache the batch size, since it's a runtime flag.
+  int32_t batch_size = FLAGS_hive_metastore_notification_log_batch_size;
+
+  // Retrieve the last processed event ID from the catalog manager. The latest
+  // event ID is requested for every call to Poll() because leadership may have
+  // changed, and another leader may have processed events.
+  int64_t durable_event_id = catalog_manager_->GetLatestNotificationLogEventId();
+
+  // Also keep track of the latest event ID which has been processed locally.
+  int64_t processed_event_id = durable_event_id;
+  vector<hive::NotificationEvent> events;
+  while (true) {
+    events.clear();
+
+    {
+      std::lock_guard<Mutex> l(lock_);
+      if (closing_) {
+        return Status::ServiceUnavailable(kShutdownMessage);
+      }
+    }
+
+    RETURN_NOT_OK_PREPEND(catalog_manager_->HmsCatalog()->GetNotificationEvents(processed_event_id,
+                                                                                batch_size,
+                                                                                &events),
+                          "failed to retrieve notification log events");
+
+    for (const auto& event : events) {
+      VLOG(1) << "Processing notification log event: " << EventDebugString(event);
+
+      // Check for out-of-order events. Out-of-order events are skipped, since
+      // refusing to process them by returning early would result in the
+      // notification log listener indefinitely short-circuiting on the same
+      // invalid event.
+      if (event.eventId <= processed_event_id) {
+        LOG(DFATAL) << "Received out-of-order notification log event "
+                    << "(last processed event ID: " << processed_event_id << "): "
+                    << EventDebugString(event);
+        continue;
+      }
+
+      Status s;
+      if (event.eventType == "ALTER_TABLE") {
+        s = HandleAlterTableEvent(event, &durable_event_id);
+      } else if (event.eventType == "DROP_TABLE") {
+        s = HandleDropTableEvent(event, &durable_event_id);
+      }
+
+      // Failing to properly handle a notification is not a fatal error, instead
+      // we continue processing notifications. Callers of WaitForCatchUp have no
+      // way of indicating which specific notification they are waiting for, and
+      // returning early with error pertaining to a different notifications
+      // could result in not waiting long enough.
+      //
+      // Consider a CREATE TABLE call which succeeds in adding an entry to the
+      // HMS, but fails to write to the sys catalog, because leadership has been
+      // lost. In this case a rollback attempt will occur, and the entry will be
+      // deleted from the HMS. When the notification for that delete is
+      // processed by the listener, it will necessarily fail to apply, since the
+      // table never existed in Kudu. It's critical that in cases like this
+      // the notification log listener continues to make progress.
+      //
+      // TODO(KUDU-2475): Ignoring errors could result in a client receiving an
+      // ack for a table rename or drop which fails.
+      WARN_NOT_OK(s, Substitute("Failed to handle Hive Metastore notification: $0",
+                                 EventDebugString(event)));
+
+      // Short-circuit when leadership is lost to prevent applying notification
+      // events out of order.
+      if (l.has_term_changed()) {
+        return Status::ServiceUnavailable(
+            "lost leadership while handling Hive Metastore notification log events", s.message());
+      }
+
+      processed_event_id = event.eventId;
+    }
+
+    // If the last set of events was smaller than the batch size then we can
+    // assume that we've read all of the available events.
+    if (events.size() < batch_size) break;
+  }
+
+  // The durable event ID gets updated every time we make a change in response
+  // to a log notification, however not every log notification results in a
+  // change (for instance, a notification pertaining to a Parquet table). To
+  // avoid replaying these notifications we persist the latest processed
+  // notification log event ID after polling. This is best effort, since failing
+  // to update the ID should only results in wasted work, not an unsynchronized
+  // catalog.
+  if (durable_event_id < processed_event_id) {
+    WARN_NOT_OK(catalog_manager_->StoreLatestNotificationLogEventId(processed_event_id),
+                "failed to record latest processed Hive Metastore notification log ID");
+  }
+
+  return Status::OK();
+}
+
+Status HmsNotificationLogListenerTask::HandleAlterTableEvent(const hive::NotificationEvent& event,
+                                                             int64_t* durable_event_id) {
+  Document message;
+  RETURN_NOT_OK(ParseMessage(event, &message));
+
+  hive::Table before_table;
+  RETURN_NOT_OK(DeserializeTable(event, message, "tableObjBeforeJson", &before_table));
+
+  const string* storage_handler =
+      FindOrNull(before_table.parameters, hms::HmsClient::kStorageHandlerKey);
+  if (!storage_handler || *storage_handler != hms::HmsClient::kKuduStorageHandler) {
+    // Not a Kudu table; skip it.
+    return Status::OK();
+  }
+
+  hive::Table after_table;
+  RETURN_NOT_OK(DeserializeTable(event, message, "tableObjAfterJson", &after_table));
+
+  // Double check that the Kudu HMS plugin is enforcing storage handler and
+  // table ID constraints correctly.
+  const string* after_storage_handler =
+      FindOrNull(before_table.parameters, hms::HmsClient::kStorageHandlerKey);
+  if (!after_storage_handler || *after_storage_handler != *storage_handler) {
+    return Status::IllegalState("storage handler property altered");
+  }
+
+  const string* table_id = FindOrNull(before_table.parameters, hms::HmsClient::kKuduTableIdKey);
+  if (!table_id) {
+    return Status::IllegalState("missing Kudu table ID");
+  }
+  const string* after_table_id = FindOrNull(after_table.parameters,
+                                            hms::HmsClient::kKuduTableIdKey);
+  if (!after_table_id || *after_table_id != *table_id) {
+    return Status::IllegalState("Kudu table ID altered");
+  }
+
+  string before_table_name = Substitute("$0.$1", before_table.dbName, before_table.tableName);
+  string after_table_name = Substitute("$0.$1", event.dbName, event.tableName);
+
+  if (before_table_name == after_table_name) {
+    VLOG(2) << "Ignoring non-rename alter table event on table "
+            << *table_id << " " << before_table_name;
+    return Status::OK();
+  }
+
+  RETURN_NOT_OK(catalog_manager_->RenameTableHms(*table_id, before_table_name,
+                                                 after_table_name, event.eventId));
+  *durable_event_id = event.eventId;
+  return Status::OK();
+}
+
+Status HmsNotificationLogListenerTask::HandleDropTableEvent(const hive::NotificationEvent& event,
+                                                            int64_t* durable_event_id) {
+  Document message;
+  RETURN_NOT_OK(ParseMessage(event, &message));
+
+  hive::Table table;
+  RETURN_NOT_OK(DeserializeTable(event, message, "tableObjJson", &table));
+
+  const string* storage_handler = FindOrNull(table.parameters, hms::HmsClient::kStorageHandlerKey);
+  if (!storage_handler || *storage_handler != hms::HmsClient::kKuduStorageHandler) {
+    // Not a Kudu table; skip it.
+    return Status::OK();
+  }
+
+  const string* table_id = FindOrNull(table.parameters, hms::HmsClient::kKuduTableIdKey);
+  if (!table_id) {
+    return Status::IllegalState("missing Kudu table ID");
+  }
+
+  // Require the table ID *and* table name from the HMS drop event to match the
+  // Kudu catalog's metadata for the table. Checking the name in addition to the
+  // ID prevents a table from being dropped while the HMS and Kudu catalogs are
+  // unsynchronized. If the catalogs are unsynchronized, it's better to return
+  // an error than liberally delete data.
+  string table_name = Substitute("$0.$1", event.dbName, event.tableName);
+  RETURN_NOT_OK(catalog_manager_->DeleteTableHms(table_name, *table_id, event.eventId));
+  *durable_event_id = event.eventId;
+  return Status::OK();
+}
+
+} // namespace master
+} // namespace kudu


Mime
View raw message