kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From abu...@apache.org
Subject [kudu] 01/04: hms: have tools ignore other clusters
Date Sat, 01 Jun 2019 08:38:11 GMT
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit fe0889eddc279b1deecc07e7fe6d745edf92047a
Author: Andrew Wong <awong@apache.org>
AuthorDate: Wed May 22 19:42:12 2019 -0700

    hms: have tools ignore other clusters
    
    Currently, the HMS tooling will check and fix the master addresses field
    of HMS entries based on strict string comparison, assuming that the HMS
    entry found for a Kudu table _should_ belong to the Kudu cluster being
    operated on.
    
    However, it is possible for there to exist metadata for multiple Kudu
    clusters in a single Hive Metastore instance. As such, tooling to check
    and fix a Kudu cluster's HMS integration shouldn't necessarily affect
    another's.
    
    This patch addresses this by imbuing the HMS tooling with the following
    behavior:
    - If any master address in a given HMS entry coincides with the expected
      Kudu masters, fix them (as was the behavior before); this is common if
      adding new masters, or replacing dead masters.
    - If none of the master addresses in the HMS entry coincide with those
      expected, the HMS entry may belong to another Kudu cluster. By
      default, the tools will ignore such HMS entries. The user can specify
      --noignore-other-clusters to have the tools check/fix such master
      addresses.
    
    Change-Id: Ib4e2ad5835fd7fedd1e963d234b153c1df5f8766
    Reviewed-on: http://gerrit.cloudera.org:8080/13411
    Tested-by: Kudu Jenkins
    Reviewed-by: Hao Hao <hao.hao@cloudera.com>
---
 src/kudu/tools/kudu-tool-test.cc  | 102 +++++++++++++++++++++++++++++++++++++-
 src/kudu/tools/tool_action_hms.cc |  46 +++++++++++++++--
 2 files changed, 143 insertions(+), 5 deletions(-)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index f01a73c..d872724 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -3447,7 +3447,8 @@ TEST_P(ToolTestKerberosParameterized, TestCheckAndAutomaticFixHmsMetadata)
{
   ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_master_addrs"));
   ASSERT_OK(kudu_client->OpenTable("default.inconsistent_master_addrs",
         &inconsistent_master_addrs));
-  HmsCatalog invalid_hms_catalog("invalid-master-addrs");
+  HmsCatalog invalid_hms_catalog(Substitute("$0,extra_masters",
+      cluster_->master_rpc_addrs()[0].ToString()));
   ASSERT_OK(invalid_hms_catalog.Start());
   ASSERT_OK(invalid_hms_catalog.CreateTable(
         inconsistent_master_addrs->id(), inconsistent_master_addrs->name(), kUsername,
@@ -3778,6 +3779,105 @@ TEST_P(ToolTestKerberosParameterized, TestCheckAndManualFixHmsMetadata)
{
   }), kudu_tables);
 }
 
+namespace {
+
+// Rewrites the specified HMS table, replacing the given parameters with the
+// given value.
+Status AlterHmsWithReplacedParam(HmsClient* hms_client,
+                                 const string& db, const string& table,
+                                 const string& param, const string& val) {
+  hive::Table updated_hms_table;
+  RETURN_NOT_OK(hms_client->GetTable(db, table, &updated_hms_table));
+  updated_hms_table.parameters[param] = val;
+  return hms_client->AlterTable(db, table, updated_hms_table);
+}
+
+} // anonymous namespace
+
+TEST_F(ToolTest, TestHmsIgnoresDifferentMasters) {
+  ExternalMiniClusterOptions opts;
+  opts.num_masters = 2;
+  opts.hms_mode = HmsMode::ENABLE_METASTORE_INTEGRATION;
+  NO_FATALS(StartExternalMiniCluster(std::move(opts)));
+
+  thrift::ClientOptions hms_opts;
+  HmsClient hms_client(cluster_->hms()->address(), hms_opts);
+  ASSERT_OK(hms_client.Start());
+
+  shared_ptr<KuduClient> kudu_client;
+  ASSERT_OK(cluster_->CreateClient(nullptr, &kudu_client));
+
+  // Create a Kudu table.
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.table"));
+  vector<string> master_addrs;
+  for (const auto& hp : cluster_->master_rpc_addrs()) {
+    master_addrs.emplace_back(hp.ToString());
+  }
+  const string master_addrs_str = JoinStrings(master_addrs, ",");
+  // Do a sanity check that the tool is fine with the existing table.
+  // Note: In the happy case, the tool will not log to stdout or stderr.
+  NO_FATALS(RunActionStdoutNone(Substitute("hms check $0", master_addrs_str)));
+
+  // Check that the tool will be OK with the addresses in the HMS being
+  // reordered.
+  {
+    std::reverse(master_addrs.begin(), master_addrs.end());
+    hive::Table hms_table_reversed_masters;
+    ASSERT_OK(AlterHmsWithReplacedParam(&hms_client, "default", "table",
+        HmsClient::kKuduMasterAddrsKey, JoinStrings(master_addrs, ",")));
+    NO_FATALS(RunActionStdoutNone(Substitute("hms check $0", master_addrs_str)));
+  }
+
+  string out;
+  string err;
+  // Check that the tool will flag cases where the HMS contains masters that
+  // aren't quite right that overlap with the correct set of masters (e.g. in
+  // the case of a multi-master migration).
+  // Try with an extra master.
+  ASSERT_OK(AlterHmsWithReplacedParam(&hms_client, "default", "table",
+      HmsClient::kKuduMasterAddrsKey, Substitute("$0,other_master_addr", master_addrs_str)));
+  Status s = RunActionStdoutStderrString(
+      Substitute("hms check $0", master_addrs_str), &out, &err);
+  ASSERT_STR_CONTAINS(out, "default.table");
+  NO_FATALS(RunActionStdoutNone(Substitute("hms fix $0", master_addrs_str)));
+  NO_FATALS(RunActionStdoutNone(Substitute("hms check $0", master_addrs_str)));
+
+  // And with a missing master.
+  ASSERT_OK(AlterHmsWithReplacedParam(&hms_client, "default", "table",
+      HmsClient::kKuduMasterAddrsKey, cluster_->master_rpc_addrs()[0].ToString()));
+  s = RunActionStdoutStderrString(Substitute("hms check $0", master_addrs_str), &out,
&err);
+  ASSERT_STR_CONTAINS(out, "default.table");
+  NO_FATALS(RunActionStdoutNone(Substitute("hms fix $0", master_addrs_str)));
+  NO_FATALS(RunActionStdoutNone(Substitute("hms check $0", master_addrs_str)));
+
+  // Set the masters to point to an entirely different set of masters.
+  ASSERT_OK(AlterHmsWithReplacedParam(&hms_client, "default", "table",
+      HmsClient::kKuduMasterAddrsKey, "other_master_addrs"));
+
+  // The check tool will ignore the HMS metadata from the other cluster, and
+  // view the Kudu table as orphaned.
+  s = RunActionStdoutStderrString(
+      Substitute("hms check $0", master_addrs_str), &out, &err);
+  ASSERT_STR_CONTAINS(out, "default.table");
+
+  // Attempting to fix this orphaned Kudu table will lead the tool to attempt
+  // to create an HMS table entry, though one already exists.
+  s = RunActionStdoutStderrString(
+      Substitute("hms fix $0", master_addrs_str), &out, &err);
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+  ASSERT_STR_CONTAINS(err, "table already exists");
+
+  // And when specifying the right flag, it shouldn't ignore the other masters,
+  // and should be able to fix the master addresses.
+  s = RunActionStdoutStderrString(
+      Substitute("hms check $0 --noignore-other-clusters", master_addrs_str), &out, &err);
+  ASSERT_STR_CONTAINS(out, "default.table");
+  NO_FATALS(RunActionStdoutNone(
+      Substitute("hms fix $0 --noignore-other-clusters", master_addrs_str)));
+  NO_FATALS(RunActionStdoutNone(
+      Substitute("hms check $0 --noignore-other-clusters", master_addrs_str)));
+}
+
 TEST_F(ToolTest, TestHmsPrecheck) {
   ExternalMiniClusterOptions opts;
   opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE;
diff --git a/src/kudu/tools/tool_action_hms.cc b/src/kudu/tools/tool_action_hms.cc
index b4bd3e3..f96d462 100644
--- a/src/kudu/tools/tool_action_hms.cc
+++ b/src/kudu/tools/tool_action_hms.cc
@@ -15,9 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
 #include <iostream>
+#include <iterator>
 #include <map>
 #include <memory>
+#include <set>
 #include <string>
 #include <type_traits>
 #include <unordered_map>
@@ -63,6 +66,9 @@ DEFINE_bool(fix_inconsistent_tables, true,
 DEFINE_bool(upgrade_hms_tables, true,
     "Upgrade Hive Metastore tables from the legacy Impala metadata format to the "
     "new Kudu metadata format.");
+DEFINE_bool(ignore_other_clusters, true,
+    "Whether to ignore entirely separate Kudu clusters, as indicated by a "
+    "different set of master addresses.");
 
 namespace kudu {
 
@@ -81,6 +87,7 @@ using std::endl;
 using std::make_pair;
 using std::ostream;
 using std::pair;
+using std::set;
 using std::string;
 using std::unique_ptr;
 using std::unordered_map;
@@ -156,15 +163,22 @@ Status HmsDowngrade(const RunnerContext& context) {
   return Status::OK();
 }
 
-// Given a kudu table and a hms table, checks if their metadata is in sync.
-bool IsSynced(const string& master_addresses,
+// Given a Kudu table and a HMS table, checks if their metadata is in sync.
+bool IsSynced(const set<string>& master_addresses,
               const KuduTable& kudu_table,
               const hive::Table& hms_table) {
+  DCHECK(!master_addresses.empty());
   auto schema = KuduSchema::ToSchema(kudu_table.schema());
   hive::Table hms_table_copy(hms_table);
   // TODO(ghenke): Get the owner from Kudu?
+  const string* hms_masters_field = FindOrNull(hms_table.parameters,
+                                               HmsClient::kKuduMasterAddrsKey);
+  if (!hms_masters_field ||
+      master_addresses != static_cast<set<string>>(Split(*hms_masters_field,
","))) {
+    return false;
+  }
   Status s = HmsCatalog::PopulateTable(kudu_table.id(), kudu_table.name(), hms_table.owner,
schema,
-                                       master_addresses, hms_table.tableType, &hms_table_copy);
+                                       *hms_masters_field, hms_table.tableType, &hms_table_copy);
   return s.ok() && hms_table_copy == hms_table;
 }
 
@@ -298,6 +312,7 @@ Status AnalyzeCatalogs(const string& master_addrs,
   // Step 2: retrieve all Kudu table entries in the HMS, filter all orphaned
   // entries which reference non-existent Kudu tables, and group the rest by
   // table type and table ID.
+  const set<string> kudu_master_addrs = Split(master_addrs, ",");
   vector<hive::Table> orphan_hms_tables;
   unordered_map<string, vector<hive::Table>> managed_hms_tables_by_id;
   unordered_map<string, vector<hive::Table>> external_hms_tables_by_id;
@@ -305,6 +320,27 @@ Status AnalyzeCatalogs(const string& master_addrs,
     vector<hive::Table> hms_tables;
     RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables));
     for (hive::Table& hms_table : hms_tables) {
+      // If the addresses in the HMS entry don't overlap at all with the
+      // expected addresses, the entry is likely from another Kudu cluster.
+      // Ignore it if appropriate.
+      if (FLAGS_ignore_other_clusters) {
+        const string* hms_masters_field = FindOrNull(hms_table.parameters,
+                                                    HmsClient::kKuduMasterAddrsKey);
+        vector<string> master_intersection;
+        if (hms_masters_field) {
+          const set<string> hms_master_addrs = Split(*hms_masters_field, ",");
+          std::set_intersection(hms_master_addrs.begin(), hms_master_addrs.end(),
+                                kudu_master_addrs.begin(), kudu_master_addrs.end(),
+                                std::back_inserter(master_intersection));
+        }
+        if (master_intersection.empty()) {
+          LOG(INFO) << Substitute("Skipping HMS table $0.$1 with different "
+              "masters specified: $2", hms_table.dbName, hms_table.tableName,
+              hms_masters_field ? *hms_masters_field : "<none>");
+          continue;
+        }
+      }
+
       // If this is a non-legacy, managed table, we expect a table ID to exist
       // in the HMS entry; look up the Kudu table by ID. Otherwise, look it up
       // by table name.
@@ -366,7 +402,7 @@ Status AnalyzeCatalogs(const string& master_addrs,
       hive::Table& hms_table = (*hms_tables)[0];
       const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
       if (storage_handler == HmsClient::kKuduStorageHandler &&
-          !IsSynced(master_addrs, *kudu_table, hms_table)) {
+          !IsSynced(kudu_master_addrs, *kudu_table, hms_table)) {
         stale_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
       } else if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
         legacy_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
@@ -758,6 +794,7 @@ unique_ptr<Mode> BuildHmsMode() {
           .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
           .AddOptionalParameter("hive_metastore_sasl_enabled", boost::none, kHmsSaslEnabledDesc)
           .AddOptionalParameter("hive_metastore_uris", boost::none, kHmsUrisDesc)
+          .AddOptionalParameter("ignore_other_clusters")
           .Build();
 
   unique_ptr<Action> hms_downgrade =
@@ -780,6 +817,7 @@ unique_ptr<Mode> BuildHmsMode() {
         .AddOptionalParameter("upgrade_hms_tables")
         .AddOptionalParameter("hive_metastore_sasl_enabled", boost::none, kHmsSaslEnabledDesc)
         .AddOptionalParameter("hive_metastore_uris", boost::none, kHmsUrisDesc)
+        .AddOptionalParameter("ignore_other_clusters")
         .Build();
 
   unique_ptr<Action> hms_precheck =


Mime
View raw message