kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject kudu git commit: KUDU-2191 (8/n): Integrate HmsCatalog into CatalogManager
Date Tue, 03 Apr 2018 19:24:14 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 5fc31db00 -> 02560c188


KUDU-2191 (8/n): Integrate HmsCatalog into CatalogManager

This commit connects the CatalogManager to the HMS via the HmsCatalog
abstraction. When the HMS integration is enabled (by setting the
--hive-metastore-uris flag) and Kudu tables are created, altered, or
dropped, the corresponding HMS entry is also modified appropriately.
Additionally, New table and column names are required to be valid
according to the Hive identifier rules, which are much stricter than
Kudu's existing identifier rules.

Testing: This commit adds a new integration test (master_hms-itest)
which tests that the integration works as expected with
create/alter/drop table operations. Additionally, some existing DDL
stress tests now have the HMS integration enabled in order to provide
more coverage.

Change-Id: Ie68e143c3c317c7690af097e6485934feb1010b4
Reviewed-on: http://gerrit.cloudera.org:8080/9863
Reviewed-by: Dan Burkert <danburkert@apache.org>
Tested-by: Dan Burkert <danburkert@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/02560c18
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/02560c18
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/02560c18

Branch: refs/heads/master
Commit: 02560c188f6b76714ee1908420e539f28bce99e8
Parents: 5fc31db
Author: Dan Burkert <danburkert@apache.org>
Authored: Wed Mar 28 16:45:35 2018 -0700
Committer: Dan Burkert <danburkert@apache.org>
Committed: Tue Apr 3 19:23:58 2018 +0000

----------------------------------------------------------------------
 build-support/iwyu/iwyu-filter.awk              |   6 +
 src/kudu/hms/hms_catalog.cc                     |  62 +--
 src/kudu/hms/hms_catalog.h                      |  11 +-
 src/kudu/hms/hms_client.h                       |   6 +-
 src/kudu/hms/mini_hms.h                         |   2 +-
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 .../alter_table-randomized-test.cc              |   3 +-
 .../integration-tests/master-stress-test.cc     |   7 +-
 .../integration-tests/master_failover-itest.cc  |  43 +-
 src/kudu/integration-tests/master_hms-itest.cc  | 394 +++++++++++++++++++
 src/kudu/master/CMakeLists.txt                  |   3 +-
 src/kudu/master/catalog_manager.cc              | 184 +++++++--
 src/kudu/master/catalog_manager.h               |   9 +-
 src/kudu/master/master.proto                    |   3 +
 src/kudu/mini-cluster/external_mini_cluster.cc  |  10 +
 15 files changed, 655 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/build-support/iwyu/iwyu-filter.awk
----------------------------------------------------------------------
diff --git a/build-support/iwyu/iwyu-filter.awk b/build-support/iwyu/iwyu-filter.awk
index c20f2d1..42f747f 100644
--- a/build-support/iwyu/iwyu-filter.awk
+++ b/build-support/iwyu/iwyu-filter.awk
@@ -90,7 +90,13 @@ BEGIN {
   muted["kudu/common/encoded_key-test.cc"]
   muted["kudu/common/schema.h"]
   muted["kudu/experiments/rwlock-perf.cc"]
+  muted["kudu/hms/hms_catalog.cc"]
+  muted["kudu/hms/hms_catalog.h"]
   muted["kudu/hms/hms_client.cc"]
+  muted["kudu/hms/hms_client.h"]
+  muted["kudu/hms/mini_hms.h"]
+  muted["kudu/master/catalog_manager.cc"]
+  muted["kudu/master/catalog_manager.h"]
   muted["kudu/rpc/reactor.cc"]
   muted["kudu/rpc/reactor.h"]
   muted["kudu/security/ca/cert_management.cc"]

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/hms/hms_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
index 3cfbd90..a6433b3 100644
--- a/src/kudu/hms/hms_catalog.cc
+++ b/src/kudu/hms/hms_catalog.cc
@@ -17,13 +17,13 @@
 
 #include "kudu/hms/hms_catalog.h"
 
-#include <algorithm> // IWYU pragma: keep
-#include <functional> // IWYU pragma: keep
+#include <algorithm>
+#include <functional>
 #include <iostream>
-#include <map> // IWYU pragma: keep
+#include <map>
 #include <string>
-#include <type_traits> // IWYU pragma: keep
-#include <utility> // IWYU pragma: keep
+#include <type_traits>
+#include <utility>
 #include <vector>
 
 #include <gflags/gflags.h>
@@ -33,16 +33,16 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/strings/split.h" // IWYU pragma: keep
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/hms/hive_metastore_constants.h"
 #include "kudu/hms/hive_metastore_types.h"
 #include "kudu/hms/hms_client.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/flag_tags.h"
-#include "kudu/util/net/net_util.h" // IWYU pragma: keep
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/slice.h"
-#include "kudu/util/threadpool.h" // IWYU pragma: keep
+#include "kudu/util/threadpool.h"
 
 using std::string;
 using std::vector;
@@ -273,8 +273,8 @@ Status HmsCatalog::AlterTable(const string& id,
 
 template<typename Task>
 Status HmsCatalog::Execute(Task task) {
-  Synchronizer s;
-  auto callback = s.AsStdStatusCallback();
+  Synchronizer synchronizer;
+  auto callback = synchronizer.AsStdStatusCallback();
 
   // TODO(todd): wrapping this in a TRACE_EVENT scope and a LOG_IF_SLOW and such
   // would be helpful. Perhaps a TRACE message and/or a TRACE_COUNTER_INCREMENT
@@ -284,8 +284,6 @@ Status HmsCatalog::Execute(Task task) {
   // LOG_IF_SLOW calls internally.
 
   RETURN_NOT_OK(threadpool_->SubmitFunc([=] {
-    Status s;
-
     // The main run routine of the threadpool thread. Runs the task with
     // exclusive access to the HMS client. If the task fails, it will be
     // retried, unless the failure type is non-retriable or the maximum number
@@ -312,16 +310,22 @@ Status HmsCatalog::Execute(Task task) {
     //
     // * Task results in a non-fatal error - a non-fatal error is an application
     // level error, and causes the task to be failed immediately (no retries).
-    for (int i = 0; i <= FLAGS_hive_metastore_retry_count; i++) {
+
+    // Keep track of the first attempt's failure. Typically the first failure is
+    // the most informative.
+    Status first_failure;
+
+    for (int attempt = 0; attempt <= FLAGS_hive_metastore_retry_count; attempt++) {
       if (!hms_client_.IsConnected()) {
         if (reconnect_after_ > MonoTime::Now()) {
           // Not yet ready to attempt reconnection; fail the task immediately.
+          DCHECK(!reconnect_failure_.ok());
           return callback(reconnect_failure_);
         }
 
         // Attempt to reconnect.
-        Status s = Reconnect();
-        if (!s.ok()) {
+        Status reconnect_status = Reconnect();
+        if (!reconnect_status.ok()) {
           // Reconnect failed; retry with exponential backoff capped at 10s and
           // fail the task. We don't bother with jitter here because only the
           // leader master should be attempting this in any given period per
@@ -330,7 +334,7 @@ Status HmsCatalog::Execute(Task task) {
           reconnect_after_ = MonoTime::Now() +
               std::min(MonoDelta::FromMilliseconds(100 << consecutive_reconnect_failures_),
                        MonoDelta::FromSeconds(10));
-          reconnect_failure_ = std::move(s);
+          reconnect_failure_ = std::move(reconnect_status);
           return callback(reconnect_failure_);
         }
 
@@ -338,27 +342,34 @@ Status HmsCatalog::Execute(Task task) {
       }
 
       // Execute the task.
-      s = task(&hms_client_);
+      Status task_status = task(&hms_client_);
 
       // If the task succeeds, or it's a non-retriable error, return the result.
-      if (s.ok() || !IsFatalError(s)) {
-        return callback(s);
+      if (task_status.ok() || !IsFatalError(task_status)) {
+        return callback(task_status);
       }
 
       // A fatal error occurred. Tear down the connection, and try again. We
       // don't log loudly here because odds are the reconnection will fail if
       // it's a true fault, at which point we do log loudly.
-      VLOG(1) << "Call to HMS failed: " << s.ToString();
+      VLOG(1) << "Call to HMS failed: " << task_status.ToString();
+
+      if (attempt == 0) {
+        first_failure = std::move(task_status);
+      }
+
       WARN_NOT_OK(hms_client_.Stop(), "Failed to stop Hive Metastore client");
     }
 
     // We've exhausted the allowed retries.
+    DCHECK(!first_failure.ok());
     LOG(WARNING) << "Call to HMS failed after " << FLAGS_hive_metastore_retry_count
-                 << " retries: " << s.ToString();
-    return callback(s);
+                 << " retries: " << first_failure.ToString();
+
+    return callback(first_failure);
   }));
 
-  return s.Wait();
+  return synchronizer.Wait();
 }
 
 Status HmsCatalog::Reconnect() {
@@ -426,6 +437,7 @@ string column_to_field_type(const ColumnSchema& column) {
     case UNIXTIME_MICROS: return "timestamp";
     default: LOG(FATAL) << "unhandled column type: " << column.TypeToString();
   }
+  __builtin_unreachable();
 }
 
 hive::FieldSchema column_to_field(const ColumnSchema& column) {
@@ -522,5 +534,9 @@ bool HmsCatalog::ValidateUris(const char* flag_name, const string& metastore_uri
   return s.ok();
 }
 
+bool HmsCatalog::IsEnabled() {
+  return !FLAGS_hive_metastore_uris.empty();
+}
+
 } // namespace hms
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/hms/hms_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_catalog.h b/src/kudu/hms/hms_catalog.h
index fbd2839..f8768f4 100644
--- a/src/kudu/hms/hms_catalog.h
+++ b/src/kudu/hms/hms_catalog.h
@@ -18,15 +18,15 @@
 #pragma once
 
 #include <string>
-#include <vector> // IWYU pragma: keep
+#include <vector>
 
 #include <gtest/gtest_prod.h>
 
-#include "kudu/gutil/gscoped_ptr.h" // IWYU pragma: keep
+#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
 #include "kudu/hms/hms_client.h"
 #include "kudu/util/monotime.h"
-#include "kudu/util/net/net_util.h" // IWYU pragma: keep
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/status.h"
 
 namespace hive {
@@ -36,7 +36,7 @@ class Table;
 namespace kudu {
 
 class Schema;
-class ThreadPool; // IWYU pragma: keep
+class ThreadPool;
 
 namespace hms {
 
@@ -87,6 +87,9 @@ class HmsCatalog {
   // Validates the Hive Metastore SASL gflags.
   static bool ValidateSasl();
 
+  // Returns true if the HMS Catalog should be enabled.
+  static bool IsEnabled();
+
  private:
 
   FRIEND_TEST(HmsCatalogStaticTest, TestParseTableName);

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/hms/hms_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client.h b/src/kudu/hms/hms_client.h
index cafa417..38e9ab5 100644
--- a/src/kudu/hms/hms_client.h
+++ b/src/kudu/hms/hms_client.h
@@ -19,7 +19,7 @@
 
 #include <cstdint>
 #include <string>
-#include <vector> // IWYU pragma: keep
+#include <vector>
 
 #include "kudu/gutil/port.h"
 #include "kudu/hms/ThriftHiveMetastore.h"
@@ -30,8 +30,8 @@
 namespace hive {
 class Database;
 class EnvironmentContext;
-class NotificationEvent; // IWYU pragma: keep
-class Partition; // IWYU pragma: keep
+class NotificationEvent;
+class Partition;
 class Table;
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/hms/mini_hms.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/mini_hms.h b/src/kudu/hms/mini_hms.h
index 9b76780..aa0ef4e 100644
--- a/src/kudu/hms/mini_hms.h
+++ b/src/kudu/hms/mini_hms.h
@@ -28,7 +28,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/status.h"
-#include "kudu/util/subprocess.h" // IWYU pragma: keep
+#include "kudu/util/subprocess.h"
 
 namespace kudu {
 namespace hms {

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 0c77216..8384ee4 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -85,6 +85,7 @@ ADD_KUDU_TEST(linked_list-test RESOURCE_LOCK "master-rpc-ports" RUN_SERIAL true)
 ADD_KUDU_TEST(log-rolling-itest)
 ADD_KUDU_TEST(master_cert_authority-itest RESOURCE_LOCK "master-rpc-ports" PROCESSORS 2)
 ADD_KUDU_TEST(master_failover-itest RESOURCE_LOCK "master-rpc-ports" PROCESSORS 3)
+ADD_KUDU_TEST(master_hms-itest RUN_SERIAL true PROCESSORS 4)
 ADD_KUDU_TEST(master_migration-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST_DEPENDENCIES(master_migration-itest
   kudu)

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/integration-tests/alter_table-randomized-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/alter_table-randomized-test.cc b/src/kudu/integration-tests/alter_table-randomized-test.cc
index 1b359b7..ceda666 100644
--- a/src/kudu/integration-tests/alter_table-randomized-test.cc
+++ b/src/kudu/integration-tests/alter_table-randomized-test.cc
@@ -76,7 +76,7 @@ using std::unique_ptr;
 using std::vector;
 using strings::SubstituteAndAppend;
 
-const char* kTableName = "test-table";
+const char* kTableName = "default.test_table";
 const int kMaxColumns = 30;
 const uint32_t kMaxRangePartitions = 32;
 const vector<KuduColumnStorageAttributes::CompressionType> kCompressionTypes =
@@ -99,6 +99,7 @@ class AlterTableRandomized : public KuduTest {
 
     ExternalMiniClusterOptions opts;
     opts.num_tablet_servers = 3;
+    opts.enable_hive_metastore = true;
     // This test produces tables with lots of columns. With container preallocation,
     // we end up using quite a bit of disk space. So, we disable it.
     opts.extra_tserver_flags.emplace_back("--log_container_preallocate_bytes=0");

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/integration-tests/master-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/master-stress-test.cc b/src/kudu/integration-tests/master-stress-test.cc
index 3ad044d..2fb06cf 100644
--- a/src/kudu/integration-tests/master-stress-test.cc
+++ b/src/kudu/integration-tests/master-stress-test.cc
@@ -107,6 +107,11 @@ class MasterStressTest : public KuduTest {
     opts.num_masters = opts.master_rpc_ports.size();
     opts.num_tablet_servers = 3;
 
+    // TODO(dan): enable HMS integration. Currently the test fails when it's
+    // enabled because the HMS and Kudu catalogs become unsynchronized when
+    // masters are killed while operating on the catalog.
+    // opts.enable_hive_metastore = true;
+
     // Don't preallocate log segments, since we're creating many tablets here.
     // If each preallocates 64M or so, we use a ton of disk space in this
     // test, and it fails on normal sized /tmp dirs.
@@ -335,7 +340,7 @@ class MasterStressTest : public KuduTest {
 
  private:
   string GenerateTableName() {
-    return Substitute("table-$0", oid_generator_.Next());
+    return Substitute("default.table_$0", oid_generator_.Next());
   }
 
   bool BlockingGetTableName(string* chosen_table) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/integration-tests/master_failover-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/master_failover-itest.cc b/src/kudu/integration-tests/master_failover-itest.cc
index a0015c4..3cf4aa2 100644
--- a/src/kudu/integration-tests/master_failover-itest.cc
+++ b/src/kudu/integration-tests/master_failover-itest.cc
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <cstdint>
+#include <cstdint> // IWYU pragma: keep
 #include <memory>
-#include <ostream>
+#include <ostream> // IWYU pragma: keep
 #include <set>
 #include <string>
 #include <vector>
@@ -25,23 +25,23 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/client/client-test-util.h"
+#include "kudu/client/client-test-util.h" // IWYU pragma: keep
 #include "kudu/client/client.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/split.h"
-#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/strip.h" // IWYU pragma: keep
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
-#include "kudu/master/sys_catalog.h"
+#include "kudu/master/sys_catalog.h" // IWYU pragma: keep
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
-#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/net_util.h" // IWYU pragma: keep
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
-#include "kudu/util/subprocess.h"
+#include "kudu/util/subprocess.h" // IWYU pragma: keep
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
@@ -81,6 +81,7 @@ class MasterFailoverTest : public KuduTest {
     opts_.master_rpc_ports = { 11010, 11011, 11012 };
     opts_.num_masters = num_masters_ = opts_.master_rpc_ports.size();
     opts_.num_tablet_servers = kNumTabletServerReplicas;
+    opts_.enable_hive_metastore = true;
 
     // Reduce various timeouts below as to make the detection of
     // leader master failures (specifically, failures as result of
@@ -160,7 +161,7 @@ class MasterFailoverTest : public KuduTest {
 // wait until the table has been created) works even when the original
 // leader master has been paused.
 TEST_F(MasterFailoverTest, TestCreateTableSync) {
-  const char* kTableName = "testCreateTableSync";
+  const char* kTableName = "default.testCreateTableSync";
 
   if (!AllowSlowTests()) {
     LOG(INFO) << "This test can only be run in slow mode.";
@@ -182,7 +183,7 @@ TEST_F(MasterFailoverTest, TestCreateTableSync) {
   // 5. Client times out, finds the new master, and retries CreateTable().
   // 6. The retry fails because the table was already created in step 3.
   Status s = CreateTable(kTableName, kWaitForCreate);
-  ASSERT_TRUE(s.ok() || s.IsAlreadyPresent());
+  ASSERT_TRUE(s.ok() || s.IsAlreadyPresent()) << s.ToString();
 
   shared_ptr<KuduTable> table;
   ASSERT_OK(client_->OpenTable(kTableName, &table));
@@ -193,7 +194,7 @@ TEST_F(MasterFailoverTest, TestCreateTableSync) {
 // immediately after, then verify that the table has been created on
 // the newly elected leader master.
 TEST_F(MasterFailoverTest, TestPauseAfterCreateTableIssued) {
-  const char* kTableName = "testPauseAfterCreateTableIssued";
+  const char* kTableName = "default.testPauseAfterCreateTableIssued";
 
   if (!AllowSlowTests()) {
     LOG(INFO) << "This test can only be run in slow mode.";
@@ -224,7 +225,7 @@ TEST_F(MasterFailoverTest, TestPauseAfterCreateTableIssued) {
 // and then issue the DeleteTable call: DeleteTable should go to the newly
 // elected leader master and succeed.
 TEST_F(MasterFailoverTest, TestDeleteTableSync) {
-  const char* kTableName = "testDeleteTableSync";
+  const char* kTableName = "default.testDeleteTableSync";
   if (!AllowSlowTests()) {
     LOG(INFO) << "This test can only be run in slow mode.";
     return;
@@ -241,11 +242,11 @@ TEST_F(MasterFailoverTest, TestDeleteTableSync) {
   // It's possible for DeleteTable() to delete the table and still return
   // NotFound. See TestCreateTableSync for details.
   Status s = client_->DeleteTable(kTableName);
-  ASSERT_TRUE(s.ok() || s.IsNotFound());
+  ASSERT_TRUE(s.ok() || s.IsNotFound()) << s.ToString();
 
   shared_ptr<KuduTable> table;
   s = client_->OpenTable(kTableName, &table);
-  ASSERT_TRUE(s.IsNotFound());
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
 }
 
 // Test the scenario where we create a table, pause the leader master,
@@ -253,11 +254,11 @@ TEST_F(MasterFailoverTest, TestDeleteTableSync) {
 // should go to the newly elected leader master and succeed, renaming
 // the table.
 //
-// TODO: Add an equivalent async test. Add a test for adding and/or
+// TODO(unknown): Add an equivalent async test. Add a test for adding and/or
 // renaming a column in a table.
 TEST_F(MasterFailoverTest, TestRenameTableSync) {
-  const char* kTableNameOrig = "testAlterTableSync";
-  const char* kTableNameNew = "testAlterTableSyncRenamed";
+  const char* kTableNameOrig = "default.testAlterTableSync";
+  const char* kTableNameNew = "default.testAlterTableSyncRenamed";
 
   if (!AllowSlowTests()) {
     LOG(INFO) << "This test can only be run in slow mode.";
@@ -275,17 +276,17 @@ TEST_F(MasterFailoverTest, TestRenameTableSync) {
   // It's possible for AlterTable() to rename the table and still return
   // NotFound. See TestCreateTableSync for details.
   Status s = RenameTable(kTableNameOrig, kTableNameNew);
-  ASSERT_TRUE(s.ok() || s.IsNotFound());
+  ASSERT_TRUE(s.ok() || s.IsNotFound()) << s.ToString();
 
   shared_ptr<KuduTable> table;
   ASSERT_OK(client_->OpenTable(kTableNameNew, &table));
   s = client_->OpenTable(kTableNameOrig, &table);
-  ASSERT_TRUE(s.IsNotFound());
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
 }
 
 
 TEST_F(MasterFailoverTest, TestKUDU1374) {
-  const char* kTableName = "testKUDU1374";
+  const char* kTableName = "default.testKUDU1374";
 
   // Wait at least one additional heartbeat interval after creating the table.
   // The idea is to guarantee that all tservers sent a tablet report with the
@@ -451,7 +452,7 @@ TEST_F(MasterFailoverTest, TestMasterPermanentFailure) {
 
     // Do some operations.
 
-    string table_name = Substitute("table-$0", i);
+    string table_name = Substitute("default.table_$0", i);
     ASSERT_OK(CreateTable(table_name, kWaitForCreate));
 
     shared_ptr<KuduTable> table;
@@ -465,7 +466,7 @@ TEST_F(MasterFailoverTest, TestMasterPermanentFailure) {
       for (int j = 0; j < cluster_->num_masters(); j++) {
         ASSERT_OK(cluster_->master(j)->Pause());
         ScopedResumeExternalDaemon resume_daemon(cluster_->master(j));
-        string table_name = Substitute("table-$0-$1", i, j);
+        string table_name = Substitute("default.table_$0_$1", i, j);
 
         // See TestCreateTableSync to understand why we must check for
         // IsAlreadyPresent as well.

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/integration-tests/master_hms-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/master_hms-itest.cc b/src/kudu/integration-tests/master_hms-itest.cc
new file mode 100644
index 0000000..06fabec
--- /dev/null
+++ b/src/kudu/integration-tests/master_hms-itest.cc
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm> // IWYU pragma: keep
+#include <map> // IWYU pragma: keep
+#include <memory>
+#include <ostream> // IWYU pragma: keep
+#include <string>
+#include <vector>
+
+#include <glog/stl_logging.h> // IWYU pragma: keep
+#include <gtest/gtest.h>
+
+#include "kudu/client/client.h" // IWYU pragma: keep
+#include "kudu/client/schema.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/hms/hive_metastore_constants.h"
+#include "kudu/hms/hive_metastore_types.h"
+#include "kudu/hms/hms_client.h"
+#include "kudu/hms/mini_hms.h" // IWYU pragma: keep
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/util/decimal_util.h"
+#include "kudu/util/net/net_util.h" // IWYU pragma: keep
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+namespace kudu {
+
+using client::KuduColumnSchema;
+using client::KuduSchema;
+using client::KuduSchemaBuilder;
+using client::KuduTable;
+using client::KuduTableAlterer;
+using client::KuduTableCreator;
+using client::sp::shared_ptr;
+using cluster::ExternalMiniClusterOptions;
+using hms::HmsClient;
+using hms::HmsClientOptions;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+// Test Master <-> HMS catalog synchronization.
+class MasterHmsTest : public ExternalMiniClusterITestBase {
+ public:
+
+  void SetUp() override {
+    ExternalMiniClusterITestBase::SetUp();
+
+    ExternalMiniClusterOptions opts;
+    opts.enable_hive_metastore = true;
+    opts.num_masters = 1;
+    opts.num_tablet_servers = 1;
+    StartClusterWithOpts(std::move(opts));
+
+    hms_client_.reset(new HmsClient(cluster_->hms()->address(), HmsClientOptions()));
+    ASSERT_OK(hms_client_->Start());
+  }
+
+  void TearDown() override {
+    ASSERT_OK(hms_client_->Stop());
+    ExternalMiniClusterITestBase::TearDown();
+  }
+
+  Status StopHms() {
+    RETURN_NOT_OK(hms_client_->Stop());
+    RETURN_NOT_OK(cluster_->hms()->Stop());
+    return Status::OK();
+  }
+
+  Status StartHms() {
+    RETURN_NOT_OK(cluster_->hms()->Start());
+    RETURN_NOT_OK(hms_client_->Start());
+    return Status::OK();
+  }
+
+  Status RestartHms() {
+    RETURN_NOT_OK(StopHms());
+    RETURN_NOT_OK(StartHms());
+    return Status::OK();
+  }
+
+  Status CreateDatabase(const string& database_name) {
+    hive::Database db;
+    db.name = database_name;
+    RETURN_NOT_OK(hms_client_->CreateDatabase(db));
+    // Sanity check that the DB is created.
+    RETURN_NOT_OK(hms_client_->GetDatabase(database_name, &db));
+    return Status::OK();
+  }
+
+  Status CreateKuduTable(const string& database_name, const string& table_name) {
+    // Get coverage of all column types.
+    KuduSchema schema;
+    KuduSchemaBuilder b;
+    b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+    b.AddColumn("int8_val")->Type(KuduColumnSchema::INT8);
+    b.AddColumn("int16_val")->Type(KuduColumnSchema::INT16);
+    b.AddColumn("int32_val")->Type(KuduColumnSchema::INT32);
+    b.AddColumn("int64_val")->Type(KuduColumnSchema::INT64);
+    b.AddColumn("timestamp_val")->Type(KuduColumnSchema::UNIXTIME_MICROS);
+    b.AddColumn("string_val")->Type(KuduColumnSchema::STRING);
+    b.AddColumn("bool_val")->Type(KuduColumnSchema::BOOL);
+    b.AddColumn("float_val")->Type(KuduColumnSchema::FLOAT);
+    b.AddColumn("double_val")->Type(KuduColumnSchema::DOUBLE);
+    b.AddColumn("binary_val")->Type(KuduColumnSchema::BINARY);
+    b.AddColumn("decimal32_val")->Type(KuduColumnSchema::DECIMAL)
+        ->Precision(kMaxDecimal32Precision);
+    b.AddColumn("decimal64_val")->Type(KuduColumnSchema::DECIMAL)
+        ->Precision(kMaxDecimal64Precision);
+    b.AddColumn("decimal128_val")->Type(KuduColumnSchema::DECIMAL)
+        ->Precision(kMaxDecimal128Precision);
+
+    RETURN_NOT_OK(b.Build(&schema));
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    return table_creator->table_name(Substitute("$0.$1", database_name, table_name))
+                         .schema(&schema)
+                         .num_replicas(1)
+                         .set_range_partition_columns({ "key" })
+                         .Create();
+  }
+
+  // Checks that the Kudu table schema and the HMS table entry in their
+  // respective catalogs are synchronized for a particular table.
+  void CheckTable(const string& database_name, const string& table_name) {
+    SCOPED_TRACE(Substitute("Checking table $0.$1", database_name, table_name));
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client_->OpenTable(Substitute("$0.$1", database_name, table_name), &table));
+    KuduSchema schema = table->schema();
+
+    hive::Table hms_table;
+    ASSERT_OK(hms_client_->GetTable(database_name, table_name, &hms_table));
+
+    ASSERT_EQ(schema.num_columns(), hms_table.sd.cols.size());
+    for (int idx = 0; idx < schema.num_columns(); idx++) {
+      ASSERT_EQ(schema.Column(idx).name(), hms_table.sd.cols[idx].name);
+    }
+    ASSERT_EQ(table->id(), hms_table.parameters[hms::HmsClient::kKuduTableIdKey]);
+    ASSERT_EQ(HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs()),
+              hms_table.parameters[hms::HmsClient::kKuduMasterAddrsKey]);
+    ASSERT_EQ(hms::HmsClient::kKuduStorageHandler,
+              hms_table.parameters[hive::g_hive_metastore_constants.META_TABLE_STORAGE]);
+  }
+
+  // Checks that a table does not exist in the Kudu and HMS catalogs.
+  void CheckTableDoesNotExist(const string& database_name, const string& table_name) {
+    SCOPED_TRACE(Substitute("Checking table $0.$1 does not exist", database_name, table_name));
+
+    shared_ptr<KuduTable> table;
+    Status s = client_->OpenTable(Substitute("$0.$1", database_name, table_name), &table);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+
+    hive::Table hms_table;
+    s = hms_client_->GetTable(database_name, table_name, &hms_table);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  }
+
+ protected:
+
+  unique_ptr<HmsClient> hms_client_;
+};
+
+TEST_F(MasterHmsTest, TestCreateTable) {
+  const char* hms_database_name = "create_db";
+  const char* hms_table_name = "table";
+  string table_name = Substitute("$0.$1", hms_database_name, hms_table_name);
+
+  // Attempt to create the table before the database is created.
+  Status s = CreateKuduTable(hms_database_name, hms_table_name);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+
+  ASSERT_OK(CreateDatabase(hms_database_name));
+
+  // Create a table entry with the name.
+  hive::Table hms_table;
+  hms_table.dbName = hms_database_name;
+  hms_table.tableName = hms_table_name;
+  ASSERT_OK(hms_client_->CreateTable(hms_table));
+
+  // Attempt to create a Kudu table with the same name.
+  s = CreateKuduTable(hms_database_name, hms_table_name);
+  ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "table already exists");
+
+  // Attempt to create a Kudu table to an invalid table name.
+  s = CreateKuduTable(hms_database_name, "☃");
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "☃ is not a valid object name");
+
+  // Drop the HMS entry and create the table through Kudu.
+  ASSERT_OK(hms_client_->DropTableWithContext(hms_database_name, hms_table_name,
+                                              hive::EnvironmentContext()));
+  ASSERT_OK(CreateKuduTable(hms_database_name, hms_table_name));
+  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+
+  // Shutdown the HMS and try to create a table.
+  ASSERT_OK(StopHms());
+
+  s = CreateKuduTable(hms_database_name, "foo");
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+
+  // Start the HMS and try again.
+  ASSERT_OK(StartHms());
+  ASSERT_OK(CreateKuduTable(hms_database_name, "foo"));
+  NO_FATALS(CheckTable(hms_database_name, "foo"));
+}
+
+TEST_F(MasterHmsTest, TestRenameTable) {
+  // Create the database and Kudu table.
+  ASSERT_OK(CreateDatabase("db"));
+  ASSERT_OK(CreateKuduTable("db", "a"));
+  NO_FATALS(CheckTable("db", "a"));
+
+  // Create a non-Kudu ('external') HMS table entry.
+  hive::Table external_table;
+  external_table.dbName = "db";
+  external_table.tableName = "b";
+  ASSERT_OK(hms_client_->CreateTable(external_table));
+
+  // Attempt to rename the Kudu table to the external table name.
+  unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer("db.a"));
+  Status s = table_alterer->RenameTo("db.b")->Alter();
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "b already exists");
+
+  // Attempt to rename the Kudu table to an invalid database/table name pair.
+  table_alterer.reset(client_->NewTableAlterer("db.a"));
+  s = table_alterer->RenameTo("foo")->Alter();
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Kudu table names must be a period ('.') separated "
+                                    "database and table name pair");
+
+  // Attempt to rename the Kudu table to a non-existent database.
+  table_alterer.reset(client_->NewTableAlterer("db.a"));
+  s = table_alterer->RenameTo("non_existent_database.table")->Alter();
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+  // TODO(HIVE-18852): match on the error message.
+
+  // Attempt to rename the Kudu table to an invalid table name.
+  table_alterer.reset(client_->NewTableAlterer("db.a"));
+  s = table_alterer->RenameTo("db.☃")->Alter();
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "☃ is not a valid object name");
+
+  // Drop the HMS table entry and rename the table. This tests that the
+  // HmsCatalog will create a new entry when necessary.
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable("db.a", &table));
+  ASSERT_OK(hms_client_->DropTableWithContext("db", "a", hive::EnvironmentContext()));
+  table_alterer.reset(client_->NewTableAlterer("db.a"));
+  ASSERT_OK(table_alterer->RenameTo("db.c")->Alter());
+  NO_FATALS(CheckTable("db", "c"));
+  NO_FATALS(CheckTableDoesNotExist("db", "a"));
+
+  // Shutdown the HMS and try to rename the table.
+  ASSERT_OK(StopHms());
+  table_alterer.reset(client_->NewTableAlterer("db.c"));
+  s = table_alterer->RenameTo("db.a")->Alter();
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+
+  // Start the HMS and rename the table back to the original name.  This is the happy path.
+  ASSERT_OK(StartHms());
+  table_alterer.reset(client_->NewTableAlterer("db.c"));
+  ASSERT_OK(table_alterer->RenameTo("db.a")->Alter());
+  NO_FATALS(CheckTable("db", "a"));
+  NO_FATALS(CheckTableDoesNotExist("db", "c"));
+
+  // Drop the HMS table entry, then create a non-Kudu table entry in it's place,
+  // and attempt to rename the table.
+  ASSERT_OK(hms_client_->DropTableWithContext("db", "a", hive::EnvironmentContext()));
+  hive::Table external_table_2;
+  external_table_2.dbName = "db";
+  external_table_2.tableName = "a";
+  ASSERT_OK(hms_client_->CreateTable(external_table_2));
+  table_alterer.reset(client_->NewTableAlterer("db.a"));
+  ASSERT_OK(table_alterer->RenameTo("db.c")->Alter());
+  NO_FATALS(CheckTable("db", "c"));
+
+  // Check that all three tables still exist.
+  vector<string> tables;
+  ASSERT_OK(hms_client_->GetAllTables("db", &tables));
+  std::sort(tables.begin(), tables.end());
+  ASSERT_EQ(tables, vector<string>({ "a", "b", "c" })) << tables;
+}
+
+TEST_F(MasterHmsTest, TestAlterTable) {
+  const char* hms_database_name = "alter_db";
+  const char* hms_table_name = "table";
+  string table_name = Substitute("$0.$1", hms_database_name, hms_table_name);
+
+  ASSERT_OK(CreateDatabase(hms_database_name));
+
+  // Create the Kudu table.
+  ASSERT_OK(CreateKuduTable(hms_database_name, hms_table_name));
+  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+
+  // Alter the HMS table entry in a destructive way (remove the columns).
+  hive::Table hms_table;
+  ASSERT_OK(hms_client_->GetTable(hms_database_name, hms_table_name, &hms_table));
+  hms_table.sd.cols.clear();
+  ASSERT_OK(hms_client_->AlterTable(hms_database_name, hms_table_name, hms_table));
+  hive::Table altered_table;
+  ASSERT_OK(hms_client_->GetTable(hms_database_name, hms_table_name, &altered_table));
+  ASSERT_TRUE(altered_table.sd.cols.empty());
+
+  // Drop a column. This should correct the entire set of columns in the HMS.
+  unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(table_name));
+  ASSERT_OK(table_alterer->DropColumn("int8_val")->Alter());
+  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+
+  // Shutdown the HMS and try to alter the table.
+  ASSERT_OK(StopHms());
+  table_alterer.reset(client_->NewTableAlterer(table_name));
+  Status s = table_alterer->DropColumn("int16_val")->Alter();
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+
+  // Start the HMS and try again.
+  ASSERT_OK(StartHms());
+  table_alterer.reset(client_->NewTableAlterer(table_name));
+  ASSERT_OK(table_alterer->DropColumn("int16_val")->Alter());
+  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+
+  // Drop the table from the HMS, and insert a non-Kudu table entry, then try
+  // and alter the table.
+  ASSERT_OK(hms_client_->DropTableWithContext(hms_database_name, hms_table_name,
+                                              hive::EnvironmentContext()));
+  hms_table = hive::Table();
+  hms_table.dbName = hms_database_name;
+  hms_table.tableName = hms_table_name;
+  ASSERT_OK(hms_client_->CreateTable(hms_table));
+
+  table_alterer.reset(client_->NewTableAlterer(table_name));
+  s = table_alterer->DropColumn("int32_val")->Alter();
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "the HMS entry does not belong to the Kudu table");
+}
+
+TEST_F(MasterHmsTest, TestDeleteTable) {
+  const char* hms_database_name = "delete_db";
+  const char* hms_table_name = "table";
+  string table_name = Substitute("$0.$1", hms_database_name, hms_table_name);
+
+  ASSERT_OK(CreateDatabase(hms_database_name));
+
+  // Create the Kudu table, then drop it and ensure the HMS entry is removed.
+  ASSERT_OK(CreateKuduTable(hms_database_name, hms_table_name));
+  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+  hive::Table hms_table;
+  ASSERT_OK(hms_client_->GetTable(hms_database_name, hms_table_name, &hms_table));
+  ASSERT_OK(client_->DeleteTable(table_name));
+  NO_FATALS(CheckTableDoesNotExist(hms_database_name, hms_table_name));
+
+  // Create the Kudu table, remove the HMS entry, and ensure the Kudu table can
+  // still be dropped.
+  ASSERT_OK(CreateKuduTable(hms_database_name, hms_table_name));
+  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(table_name, &table));
+  ASSERT_OK(hms_client_->DropTableWithContext(hms_database_name, hms_table_name,
+                                              hive::EnvironmentContext()));
+  Status s = hms_client_->GetTable(hms_database_name, hms_table_name, &hms_table);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_OK(client_->DeleteTable(table_name));
+  NO_FATALS(CheckTableDoesNotExist(hms_database_name, hms_table_name));
+
+  // Ensure that dropping a table while the HMS is unreachable fails.
+  ASSERT_OK(CreateKuduTable(hms_database_name, hms_table_name));
+  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+  ASSERT_OK(StopHms());
+  s = client_->DeleteTable(table_name);
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+  ASSERT_OK(StartHms());
+  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+  ASSERT_OK(client_->DeleteTable(table_name));
+  NO_FATALS(CheckTableDoesNotExist(hms_database_name, hms_table_name));
+}
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/master/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 5e1c528..07c7565 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -68,7 +68,8 @@ set(KUDU_TEST_LINK_LIBS
   kudu_client
   kudu_curl_util
   master
-  master_proto)
+  master_proto
+  mini_hms)
 
 ADD_KUDU_TEST(catalog_manager-test)
 ADD_KUDU_TEST(master-test RESOURCE_LOCK "master-web-port")

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 35ec262..813f661 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -53,6 +53,7 @@
 #include <string>
 #include <type_traits>
 #include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -87,10 +88,12 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/escaping.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/sysinfo.h"
 #include "kudu/gutil/utf/utf.h"
 #include "kudu/gutil/walltime.h"
+#include "kudu/hms/hms_catalog.h"
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master_cert_authority.h"
@@ -645,7 +648,7 @@ Status ProcessColumnPBDefaults(ColumnSchemaPB* col) {
 
 } // anonymous namespace
 
-CatalogManager::CatalogManager(Master *master)
+CatalogManager::CatalogManager(Master* master)
   : master_(master),
     rng_(GetRandomSeed32()),
     state_(kConstructed),
@@ -681,6 +684,22 @@ Status CatalogManager::Init(bool is_first_run) {
   RETURN_NOT_OK_PREPEND(sys_catalog_->WaitUntilRunning(),
                         "Failed waiting for the catalog tablet to run");
 
+  if (hms::HmsCatalog::IsEnabled()) {
+    vector<HostPortPB> master_addrs_pb;
+    RETURN_NOT_OK(master_->GetMasterHostPorts(&master_addrs_pb));
+
+    string master_addresses = JoinMapped(
+        master_addrs_pb,
+        [] (const HostPortPB& hostport) {
+          return Substitute("$0:$1", hostport.host(), hostport.port());
+        },
+        ",");
+
+    hms_catalog_.reset(new hms::HmsCatalog(std::move(master_addresses)));
+    RETURN_NOT_OK_PREPEND(hms_catalog_->Start(),
+                          "failed to start Hive Metastore catalog");
+  }
+
   std::lock_guard<LockType> l(lock_);
   background_tasks_.reset(new CatalogManagerBgTasks(this));
   RETURN_NOT_OK_PREPEND(background_tasks_->Init(),
@@ -1145,6 +1164,10 @@ void CatalogManager::Shutdown() {
     background_tasks_->Shutdown();
   }
 
+  if (hms_catalog_) {
+    hms_catalog_->Stop();
+  }
+
   // Mark all outstanding table tasks as aborted and wait for them to fail.
   //
   // There may be an outstanding table visitor thread modifying the table map,
@@ -1509,7 +1532,31 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   }
   table->mutable_metadata()->mutable_dirty()->pb.set_state(SysTablesEntryPB::RUNNING);
 
-  // e. Write table and tablets to sys-catalog.
+  // e. Create the table in the HMS.
+  //
+  // It is critical that this step happen before writing the table to the sys catalog,
+  // since this step validates that the table name is available in the HMS catalog.
+  if (hms_catalog_) {
+    s = hms_catalog_->CreateTable(table->id(), req.name(), schema);
+    if (!s.ok()) {
+      s = s.CloneAndPrepend(Substitute("an error occurred while creating table $0 in the HMS",
+                                       req.name()));
+      LOG(WARNING) << s.ToString();
+      SetupError(resp->mutable_error(), MasterErrorPB::HIVE_METASTORE_ERROR, s);
+      return s;
+    }
+  }
+  // Delete the new HMS entry if we exit early.
+  auto abort_hms = MakeScopedCleanup([&] {
+      // TODO(dan): figure out how to test this.
+      if (hms_catalog_) {
+        TRACE("Rolling back HMS table creation");
+        WARN_NOT_OK(hms_catalog_->DropTable(table->id(), req.name()),
+                    "an error occurred while attempting to delete orphaned HMS table entry");
+      }
+  });
+
+  // f. Write table and tablets to sys-catalog.
   SysCatalogTable::Actions actions;
   actions.table_to_add = table;
   actions.tablets_to_add = tablets;
@@ -1522,7 +1569,8 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   }
   TRACE("Wrote table and tablets to system table");
 
-  // f. Commit the in-memory state.
+  // g. Commit the in-memory state.
+  abort_hms.cancel();
   table->mutable_metadata()->CommitMutation();
 
   for (const auto& tablet : tablets) {
@@ -1530,7 +1578,7 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   }
   abort_mutations.cancel();
 
-  // g. Add the tablets to the table.
+  // h. Add the tablets to the table.
   //
   // We can't reuse the above WRITE tablet locks for this because
   // AddRemoveTablets() will read from the clean state, which is empty for
@@ -1543,7 +1591,7 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
     tablet->metadata().ReadUnlock();
   }
 
-  // h. Make the new table and tablets visible in the catalog.
+  // i. Make the new table and tablets visible in the catalog.
   {
     std::lock_guard<LockType> l(lock_);
 
@@ -1682,11 +1730,43 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
     return s;
   }
 
+  // 2. Drop the HMS table entry.
+  //
+  // This step comes before modifying the sys catalog so that we can ensure the
+  // HMS is available. We do not allow dropping tables while the HMS is
+  // unavailable, because that would cause the catalogs to become unsynchronized.
+  if (hms_catalog_) {
+    Status s = hms_catalog_->DropTable(table->id(), l.data().name());
+    if (!s.ok()) {
+      s.CloneAndPrepend(Substitute("an error occurred while dropping table $0 in the HMS",
+                                   l.data().name()));
+      LOG(WARNING) << s.ToString();
+      SetupError(resp->mutable_error(), MasterErrorPB::HIVE_METASTORE_ERROR, s);
+      return s;
+    }
+  }
+  // Re-create the HMS entry if we exit early.
+  auto abort_hms = MakeScopedCleanup([&] {
+      // TODO(dan): figure out how to test this.
+      if (hms_catalog_) {
+        TRACE("Rolling back HMS table deletion");
+        Schema schema;
+        Status s = SchemaFromPB(l.data().pb.schema(), &schema);
+        if (!s.ok()) {
+          LOG(WARNING) << "Failed to decode schema during HMS table entry deletion roll-back: "
+                       << s.ToString();
+          return;
+        }
+        WARN_NOT_OK(hms_catalog_->CreateTable(table->id(), l.data().name(), schema),
+                    "An error occurred while attempting to roll-back HMS table entry deletion");
+      }
+  });
+
   TRACE("Modifying in-memory table state")
   string deletion_msg = "Table deleted at " + LocalTimeAsString();
   l.mutable_data()->set_state(SysTablesEntryPB::REMOVED, deletion_msg);
 
-  // 2. Look up the tablets, lock them, and mark them as deleted.
+  // 3. Look up the tablets, lock them, and mark them as deleted.
   {
     TRACE("Locking tablets");
     vector<scoped_refptr<TabletInfo>> tablets;
@@ -1700,7 +1780,7 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
           SysTabletsEntryPB::DELETED, deletion_msg);
     }
 
-    // 3. Update sys-catalog with the removed table and tablet state.
+    // 4. Update sys-catalog with the removed table and tablet state.
     TRACE("Removing table and tablets from system table");
     SysCatalogTable::Actions actions;
     actions.table_to_update = table;
@@ -1714,8 +1794,9 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
     }
 
     // The operation has been written to sys-catalog; now it must succeed.
+    abort_hms.cancel();
 
-    // 4. Remove the table from the by-name map.
+    // 5. Remove the table from the by-name map.
     {
       TRACE("Removing table from by-name map");
       std::lock_guard<LockType> l_map(lock_);
@@ -1724,19 +1805,19 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
       }
     }
 
-    // 5. Commit the dirty tablet state.
+    // 6. Commit the dirty tablet state.
     lock.Commit();
   }
 
-  // 6. Commit the dirty table state.
+  // 7. Commit the dirty table state.
   TRACE("Committing in-memory state");
   l.Commit();
 
-  // 7. Abort any extant tasks belonging to the table.
+  // 8. Abort any extant tasks belonging to the table.
   TRACE("Aborting table tasks");
   table->AbortTasks();
 
-  // 8. Send a DeleteTablet() request to each tablet replica in the table.
+  // 9. Send a DeleteTablet() request to each tablet replica in the table.
   SendDeleteTableRequest(table, deletion_msg);
 
   VLOG(1) << "Deleted table " << table->ToString();
@@ -2050,23 +2131,25 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
   // 3. Calculate and validate new schema for the on-disk state, not persisted yet.
   Schema new_schema;
   ColumnId next_col_id = ColumnId(l.data().pb.next_column_id());
-  if (!alter_schema_steps.empty()) {
-    TRACE("Apply alter schema");
-    Status s = ApplyAlterSchemaSteps(l.data().pb, alter_schema_steps, &new_schema, &next_col_id);
-    if (!s.ok()) {
-      SetupError(resp->mutable_error(), MasterErrorPB::INVALID_SCHEMA, s);
-      return s;
-    }
-    DCHECK_NE(next_col_id, 0);
-    DCHECK_EQ(new_schema.find_column_by_id(next_col_id),
-              static_cast<int>(Schema::kColumnNotFound));
 
-    // Just validate the schema, not the name (validated below).
-    s = ValidateClientSchema(boost::none, new_schema);
-    if (!s.ok()) {
-      SetupError(resp->mutable_error(), MasterErrorPB::INVALID_SCHEMA, s);
-      return s;
-    }
+  // Apply the alter steps. Note that there may be no steps, in which case this
+  // is essentialy a no-op. It's still important to execute because
+  // ApplyAlterSchemaSteps populates 'new_schema', which is used below.
+  TRACE("Apply alter schema");
+  Status s = ApplyAlterSchemaSteps(l.data().pb, alter_schema_steps, &new_schema, &next_col_id);
+  if (!s.ok()) {
+    SetupError(resp->mutable_error(), MasterErrorPB::INVALID_SCHEMA, s);
+    return s;
+  }
+  DCHECK_NE(next_col_id, 0);
+  DCHECK_EQ(new_schema.find_column_by_id(next_col_id),
+            static_cast<int>(Schema::kColumnNotFound));
+
+  // Just validate the schema, not the name (validated below).
+  s = ValidateClientSchema(boost::none, new_schema);
+  if (!s.ok()) {
+    SetupError(resp->mutable_error(), MasterErrorPB::INVALID_SCHEMA, s);
+    return s;
   }
 
   // 4. Validate and try to acquire the new table name.
@@ -2165,7 +2248,43 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
                                            LocalTimeAsString()));
   }
 
-  // 7. Update sys-catalog with the new table schema and tablets to add/drop.
+  // 7. Update the HMS table entry.
+  //
+  // This step comes before modifying the sys catalog so that we can ensure the
+  // HMS is available. We do not allow altering tables while the HMS is
+  // unavailable, because that would cause the catalogs to become unsynchronized.
+  if (hms_catalog_ != nullptr && has_metadata_changes) {
+    const string& new_name = req->has_new_table_name() ? req->new_table_name() : table_name;
+    Status s = hms_catalog_->AlterTable(table->id(), table_name, new_name, new_schema);
+
+    if (!s.ok()) {
+      s = s.CloneAndPrepend(Substitute("an error occurred while altering table $0 in the HMS",
+                                       table_name));
+      LOG(WARNING) << s.ToString();
+      SetupError(resp->mutable_error(), MasterErrorPB::HIVE_METASTORE_ERROR, s);
+      return s;
+    }
+  }
+  // Roll-back the HMS alteration if we exit early.
+  auto abort_hms = MakeScopedCleanup([&] {
+      // TODO(dan): figure out how to test this.
+      if (hms_catalog_) {
+        TRACE("Rolling back HMS table alterations");
+        Schema schema;
+        Status s = SchemaFromPB(l.data().pb.schema(), &schema);
+        if (!s.ok()) {
+          LOG(WARNING) << "Failed to decode schema during HMS table entry alteration roll-back: "
+                       << s.ToString();
+          return;
+        }
+        const string& new_name = req->has_new_table_name() ? req->new_table_name() : table_name;
+
+        WARN_NOT_OK(hms_catalog_->AlterTable(table->id(), new_name, table_name, schema),
+                    "An error occurred while attempting to roll-back HMS table entry alteration");
+      }
+  });
+
+  // 8. Update sys-catalog with the new table schema and tablets to add/drop.
   TRACE("Updating metadata on disk");
   string deletion_msg = "Partition dropped at " + LocalTimeAsString();
   SysCatalogTable::Actions actions;
@@ -2186,7 +2305,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
   }
   actions.tablets_to_update = tablets_to_drop;
 
-  Status s = sys_catalog_->Write(actions);
+  s = sys_catalog_->Write(actions);
   if (!s.ok()) {
     s = s.CloneAndPrepend("an error occurred while updating the sys-catalog");
     LOG(WARNING) << s.ToString();
@@ -2194,9 +2313,12 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     return s;
   }
 
-  // 8. Commit the in-memory state.
+  // 9. Commit the in-memory state.
   {
     TRACE("Committing alterations to in-memory state");
+
+    abort_hms.cancel();
+
     // Commit new tablet in-memory state. This doesn't require taking the global
     // lock since the new tablets are not yet visible, because they haven't been
     // added to the table or tablet index.

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 4efc318..0bfdb76 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -38,6 +38,7 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/hms/hms_catalog.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/ts_manager.h"
 #include "kudu/tserver/tablet_replica_lookup.h"
@@ -58,7 +59,7 @@ class NodeInstancePB;
 class PartitionPB;
 class PartitionSchema;
 class Schema;
-class ThreadPool;
+class ThreadPool; // IWYU pragma: keep
 struct ColumnId;
 
 // Working around FRIEND_TEST() ugliness.
@@ -73,7 +74,7 @@ class RpcContext;
 namespace security {
 class Cert;
 class PrivateKey;
-class TokenSigningPublicKeyPB;
+class TokenSigningPublicKeyPB; // IWYU pragma: keep
 } // namespace security
 
 namespace consensus {
@@ -87,7 +88,7 @@ class TabletReplica;
 
 namespace master {
 
-class CatalogManagerBgTasks;
+class CatalogManagerBgTasks; // IWYU pragma: keep
 class Master;
 class SysCatalogTable;
 class TSDescriptor;
@@ -917,6 +918,8 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   friend class CatalogManagerBgTasks;
   gscoped_ptr<CatalogManagerBgTasks> background_tasks_;
 
+  std::unique_ptr<hms::HmsCatalog> hms_catalog_;
+
   enum State {
     kConstructed,
     kStarting,

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 9af60a4..3859ab5 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -78,6 +78,9 @@ message MasterErrorPB {
     // The callee detected that its replica management scheme is incompatible
     // with the caller's scheme.
     INCOMPATIBLE_REPLICA_MANAGEMENT = 12;
+
+    // An operation involving the Hive Metastore failed.
+    HIVE_METASTORE_ERROR = 13;
   }
 
   // The error code.

http://git-wip-us.apache.org/repos/asf/kudu/blob/02560c18/src/kudu/mini-cluster/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index 869eb4e..d3e8b9b 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -339,6 +339,11 @@ Status ExternalMiniCluster::StartSingleMaster() {
   opts.logtostderr = opts_.logtostderr;
 
   opts.rpc_bind_address = HostPort(GetBindIpForMaster(0), 0);
+  if (opts_.enable_hive_metastore) {
+    opts.extra_flags.emplace_back(Substitute("--hive_metastore_uris=$0", hms_->uris()));
+    opts.extra_flags.emplace_back(Substitute("--hive_metastore_sasl_enabled=$0",
+                                             opts_.enable_kerberos));
+  }
   scoped_refptr<ExternalMaster> master = new ExternalMaster(opts);
   if (opts_.enable_kerberos) {
     // The bind host here is the hostname that will be used to generate the
@@ -385,6 +390,11 @@ Status ExternalMiniCluster::StartDistributedMasters() {
     opts.extra_flags = SubstituteInFlags(flags, i);
     opts.start_process_timeout = opts_.start_process_timeout;
     opts.rpc_bind_address = peer_hostports[i];
+    if (opts_.enable_hive_metastore) {
+      opts.extra_flags.emplace_back(Substitute("--hive_metastore_uris=$0", hms_->uris()));
+      opts.extra_flags.emplace_back(Substitute("--hive_metastore_sasl_enabled=$0",
+                                               opts_.enable_kerberos));
+    }
     opts.logtostderr = opts_.logtostderr;
 
     scoped_refptr<ExternalMaster> peer = new ExternalMaster(opts);


Mime
View raw message