kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [2/3] kudu git commit: KUDU-2191: HMS inconsistent metadata fix tool
Date Wed, 27 Jun 2018 00:09:42 GMT
KUDU-2191: HMS inconsistent metadata fix tool

This commit introduces a CLI tool that fixes inconsistent metadata for
Hive Metastore integration. It corrects the table name based on the
users input. And it fixes the master addresses property, column name
and type based on the values in Kudu. Test case is added using external
mini cluster to ensure the tool works as expected.

Change-Id: I63c694b5d9877cfbd218139f2e9ecf05fd69c997
Reviewed-on: http://gerrit.cloudera.org:8080/10303
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <danburkert@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/96b2f58b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/96b2f58b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/96b2f58b

Branch: refs/heads/master
Commit: 96b2f58baaa33d47263205549273e881dc51eef0
Parents: 43f076f
Author: hahao <hao.hao@cloudera.com>
Authored: Thu May 3 15:48:53 2018 -0700
Committer: Hao Hao <hao.hao@cloudera.com>
Committed: Tue Jun 26 23:41:31 2018 +0000

----------------------------------------------------------------------
 src/kudu/client/client.h          |   8 +-
 src/kudu/tools/kudu-tool-test.cc  | 104 +++++++++++++++++++++
 src/kudu/tools/tool_action_hms.cc | 165 ++++++++++++++++++++++++++++-----
 3 files changed, 250 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/96b2f58b/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 1ce4be6..9ad7afc 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -63,9 +63,9 @@ class KuduClient;
 namespace tools {
 class LeaderMasterProxy;
 
-Status AlterKuduTable(client::KuduClient* kudu_client,
-                      const std::string& name,
-                      const std::string& new_name);
+Status AlterKuduTableOnly(client::KuduClient* kudu_client,
+                          const std::string& name,
+                          const std::string& new_name);
 } // namespace tools
 
 namespace client {
@@ -1192,7 +1192,7 @@ class KUDU_EXPORT KuduTableAlterer {
 
   friend class KuduClient;
 
-  friend Status tools::AlterKuduTable(
+  friend Status tools::AlterKuduTableOnly(
       client::KuduClient* kudu_client,
       const std::string& name,
       const std::string& new_name);

http://git-wip-us.apache.org/repos/asf/kudu/blob/96b2f58b/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index d08e107..b52e171 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2446,6 +2446,110 @@ TEST_F(ToolTest, TestCheckHmsMetadata) {
   }
 }
 
+TEST_F(ToolTest, TestFixHmsMetadata) {
+  ExternalMiniClusterOptions opts;
+  opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE;
+  NO_FATALS(StartExternalMiniCluster(std::move(opts)));
+
+  string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+  HmsClient hms_client(cluster_->hms()->address(), HmsClientOptions());
+  ASSERT_OK(hms_client.Start());
+  ASSERT_TRUE(hms_client.IsConnected());
+
+  string hms_table_id = "table_id";
+  shared_ptr<KuduClient> kudu_client;
+  ASSERT_OK(KuduClientBuilder()
+                .add_master_server_addr(master_addr)
+                .Build(&kudu_client));
+
+  {
+    // Create a table with the same name/ID but different schema in Kudu and the HMS.
+    ASSERT_OK(CreateKuduTable(kudu_client, "default.a"));
+    shared_ptr<KuduTable> kudu_table;
+    ASSERT_OK(kudu_client->OpenTable("default.a", &kudu_table));
+    hive::Table hms_table;
+    ASSERT_OK(CreateHmsTable(&hms_client, "default", "a",
+                             HmsClient::kManagedTable, master_addr, kudu_table->id()));
+    ASSERT_OK(hms_client.GetTable("default", "a", &hms_table));
+
+    // Create a table with the same ID but different name/schema in Kudu and the HMS.
+    ASSERT_OK(CreateKuduTable(kudu_client, "default.b"));
+    ASSERT_OK(kudu_client->OpenTable("default.b", &kudu_table));
+    ASSERT_OK(CreateHmsTable(&hms_client, "default", "c",
+                             HmsClient::kManagedTable, master_addr, kudu_table->id()));
+    ASSERT_OK(hms_client.GetTable("default", "c", &hms_table));
+
+    // Create a table in Kudu but not the HMS.
+    ASSERT_OK(CreateKuduTable(kudu_client, "default.d"));
+    ASSERT_OK(kudu_client->OpenTable("default.d", &kudu_table));
+
+    // Create an orphan table in the HMS.
+    ASSERT_OK(CreateHmsTable(&hms_client, "default", "e",
+                             HmsClient::kManagedTable, master_addr, hms_table_id));
+    ASSERT_OK(hms_client.GetTable("default", "e", &hms_table));
+
+    // Create multiple tables in the HMS sharing the same table ID .
+    ASSERT_OK(CreateKuduTable(kudu_client, "default.kudu"));
+    ASSERT_OK(kudu_client->OpenTable("default.kudu", &kudu_table));
+    string id = kudu_table->id();
+
+    ASSERT_OK(CreateHmsTable(&hms_client, "default", "kudu",
+                             HmsClient::kManagedTable, master_addr, id));
+    ASSERT_OK(hms_client.GetTable("default", "kudu", &hms_table));
+
+    ASSERT_OK(CreateHmsTable(&hms_client, "default", "diff_name",
+                             HmsClient::kManagedTable, master_addr, id));
+    ASSERT_OK(hms_client.GetTable("default", "diff_name", &hms_table));
+  }
+
+  // Restart external mini cluster to enable Hive Metastore integration.
+  cluster_->EnableMetastoreIntegration();
+  cluster_->ShutdownNodes(cluster::ClusterNodes::ALL);
+  ASSERT_OK(cluster_->Restart());
+
+  // 2. Run the fix tool and expect it to fail.
+  {
+    string err;
+    RunActionStderrString(Substitute("hms fix $0 --unlock_experimental_flags=true "
+                                     "--hive_metastore_uris=$1",
+                                     master_addr, cluster_->hms()->uris()), &err);
+    ASSERT_STR_CONTAINS(err, "Illegal state: error fixing inconsistent metadata: "
+                             "Found more than one tables");
+  }
+
+  // 3. Delete one of the tables that share the same table ID in the HMS and then
+  //    run the fix tool again and expect it to succeed.
+  {
+    hive::EnvironmentContext env_ctx;
+    ASSERT_OK(hms_client.DropTable("default", "diff_name", env_ctx));
+    string out;
+    NO_FATALS(RunActionStdinStdoutString(Substitute("hms fix $0 --unlock_experimental_flags=true
"
+                                                    "--hive_metastore_uris=$1",
+                                                    master_addr, cluster_->hms()->uris()),
+                                         "default.c\n", &out));
+
+    NO_FATALS(RunActionStdoutString(Substitute("hms check $0 --unlock_experimental_flags=true
"
+                                               "--hive_metastore_uris=$1",
+                                               master_addr, cluster_->hms()->uris()),
&out));
+    ASSERT_STR_CONTAINS(out, "OK");
+
+    shared_ptr<KuduTable> kudu_table;
+    ASSERT_OK(kudu_client->OpenTable("default.a", &kudu_table));
+    hive::Table hms_table;
+    ASSERT_OK(hms_client.GetTable("default", "a", &hms_table));
+
+    ASSERT_OK(kudu_client->OpenTable("default.c", &kudu_table));
+    ASSERT_OK(hms_client.GetTable("default", "c", &hms_table));
+
+    ASSERT_OK(kudu_client->OpenTable("default.d", &kudu_table));
+    ASSERT_OK(hms_client.GetTable("default", "d", &hms_table));
+
+    Status s = kudu_client->OpenTable("default.e", &kudu_table);
+    ASSERT_TRUE(s.IsNotFound());
+    ASSERT_OK(hms_client.GetTable("default", "e", &hms_table));
+  }
+}
+
 // This test is parameterized on the serialization mode and Kerberos.
 class ControlShellToolTest :
     public ToolTest,

http://git-wip-us.apache.org/repos/asf/kudu/blob/96b2f58b/src/kudu/tools/tool_action_hms.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_hms.cc b/src/kudu/tools/tool_action_hms.cc
index d5aa7a0..c082235 100644
--- a/src/kudu/tools/tool_action_hms.cc
+++ b/src/kudu/tools/tool_action_hms.cc
@@ -32,6 +32,7 @@
 #include "kudu/client/shared_ptr.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
@@ -69,7 +70,7 @@ using strings::Split;
 using strings::Substitute;
 
 DEFINE_bool(enable_input, true,
-            "Whether to enable user input for renaming tables that have hive"
+            "Whether to enable user input for renaming tables that have Hive"
             "incompatible names.");
 
 // The key is the table ID. The value is a pair of a table in Kudu and a
@@ -109,9 +110,9 @@ string RenameHiveIncompatibleTable(const string& name) {
 }
 
 // Only alter the table in Kudu but not in the Hive Metastore.
-Status AlterKuduTable(KuduClient* kudu_client,
-                      const string& name,
-                      const string& new_name) {
+Status AlterKuduTableOnly(KuduClient* kudu_client,
+                          const string& name,
+                          const string& new_name) {
   unique_ptr<KuduTableAlterer> alterer(kudu_client->NewTableAlterer(name));
   return alterer->RenameTo(new_name)
                 ->alter_external_catalogs(false)
@@ -137,6 +138,8 @@ Status AlterLegacyKuduTables(KuduClient* kudu_client,
     bool exist;
     RETURN_NOT_OK(kudu_client->TableExists(hms_table.first, &exist));
     if (!exist) {
+      // Warn instead of dropping the table in the HMS to avoid breakage for
+      // installations which have multiple Kudu clusters pointed at the same HMS.
       LOG(WARNING) << Substitute("Found orphan table $0.$1 in the Hive Metastore",
                                  hms_table.second.dbName, hms_table.second.tableName);
     }
@@ -159,7 +162,7 @@ Status AlterLegacyKuduTables(KuduClient* kudu_client,
         RETURN_NOT_OK(kudu_client->TableExists(new_table_name, &exist));
         if (!exist) {
           // TODO(Hao): Use notification listener to avoid race conditions.
-          s = AlterKuduTable(kudu_client, table_name, new_table_name).AndThen([&] {
+          s = AlterKuduTableOnly(kudu_client, table_name, new_table_name).AndThen([&]
{
             return hms_catalog->UpgradeLegacyImpalaTable(kudu_table->id(),
                 hms_table->dbName, hms_table->tableName,
                 client::SchemaFromKuduSchema(kudu_table->schema()));
@@ -184,7 +187,7 @@ Status AlterLegacyKuduTables(KuduClient* kudu_client,
         s = hms_catalog->CreateTable(kudu_table->id(), new_table_name, schema);
       }
       s = s.AndThen([&] {
-        return AlterKuduTable(kudu_client, table_name, new_table_name);
+        return AlterKuduTableOnly(kudu_client, table_name, new_table_name);
       });
     }
 
@@ -286,7 +289,7 @@ bool isSynced(const string& master_addresses,
 }
 
 // Filter orphan tables from the unsynchronized tables map.
-void FilterUnsyncedTables(TablesMap* tables_map) {
+void FilterOrphanedTables(TablesMap* tables_map) {
   for (auto it = tables_map->cbegin(); it != tables_map->cend();) {
     // If the kudu table is empty, then these table in the HMS are
     // orphan tables. Filter it as we do not care about orphan tables.
@@ -349,21 +352,17 @@ Status PrintLegacyTables(const vector<hive::Table>& tables,
   return table.PrintTo(out);
 }
 
-Status CheckHmsMetadata(const RunnerContext& context) {
-  const string& master_addresses_str = FindOrDie(context.required_args,
-                                                 kMasterAddressesArg);
-  shared_ptr<KuduClient> kudu_client;
-  unique_ptr<HmsCatalog> hms_catalog;
-  Init(context, &kudu_client, &hms_catalog);
-
+Status RetrieveUnsyncedTables(const unique_ptr<HmsCatalog>& hms_catalog,
+                              const shared_ptr<KuduClient>& kudu_client,
+                              const string& master_addresses_str,
+                              TablesMap* unsynced_tables_map,
+                              vector<hive::Table>* legacy_tables) {
   // 1. Identify all Kudu table in the HMS entries.
   vector<hive::Table> hms_tables;
   RETURN_NOT_OK(hms_catalog->RetrieveTables(&hms_tables));
 
   // 2. Walk through all the Kudu tables in the HMS and identify any
   //    out of sync tables.
-  TablesMap unsynced_tables_map;
-  std::vector<hive::Table> legacy_tables;
   for (auto& hms_table : hms_tables) {
     string hms_table_id = hms_table.parameters[HmsClient::kKuduTableIdKey];
     if (hms_table.parameters[HmsClient::kStorageHandlerKey] ==
@@ -372,19 +371,19 @@ Status CheckHmsMetadata(const RunnerContext& context) {
       shared_ptr<KuduTable> kudu_table;
       Status s = kudu_client->OpenTable(table_name, &kudu_table);
       if (s.ok() && !isSynced(master_addresses_str, kudu_table, hms_table)) {
-        unsynced_tables_map[kudu_table->id()].first = kudu_table;
-        unsynced_tables_map[hms_table_id].second.emplace_back(hms_table);
+        (*unsynced_tables_map)[kudu_table->id()].first = kudu_table;
+        (*unsynced_tables_map)[hms_table_id].second.emplace_back(hms_table);
       } else if (s.IsNotFound()) {
         // We cannot determine whether this table is an orphan table in the HMS now, since
         // there may be other tables in Kudu shares the same table ID but not the same name.
         // So do it in the filtering step below.
-        unsynced_tables_map[hms_table_id].second.emplace_back(hms_table);
+        (*unsynced_tables_map)[hms_table_id].second.emplace_back(hms_table);
       } else {
         RETURN_NOT_OK(s);
       }
     } else if (hms_table.parameters[HmsClient::kStorageHandlerKey] ==
                HmsClient::kLegacyKuduStorageHandler) {
-      legacy_tables.push_back(hms_table);
+      legacy_tables->push_back(hms_table);
     }
   }
 
@@ -397,12 +396,29 @@ Status CheckHmsMetadata(const RunnerContext& context) {
     if (!ContainsKey(hms_tables_map, table_name)) {
       shared_ptr<KuduTable> kudu_table;
       RETURN_NOT_OK(kudu_client->OpenTable(table_name, &kudu_table));
-      unsynced_tables_map[kudu_table->id()].first = kudu_table;
+      (*unsynced_tables_map)[kudu_table->id()].first = kudu_table;
     }
   }
 
-  // Filter orphan tables.
-  FilterUnsyncedTables(&unsynced_tables_map);
+  // 4. Filter orphan tables.
+  FilterOrphanedTables(unsynced_tables_map);
+
+  return Status::OK();
+}
+
+Status CheckHmsMetadata(const RunnerContext& context) {
+  const string& master_addresses_str = FindOrDie(context.required_args,
+                                                 kMasterAddressesArg);
+  shared_ptr<KuduClient> kudu_client;
+  unique_ptr<HmsCatalog> hms_catalog;
+  Init(context, &kudu_client, &hms_catalog);
+
+  TablesMap unsynced_tables_map;
+  std::vector<hive::Table> legacy_tables;
+  RETURN_NOT_OK_PREPEND(RetrieveUnsyncedTables(hms_catalog, kudu_client,
+                                               master_addresses_str, &unsynced_tables_map,
+                                               &legacy_tables),
+                        "error fetching unsynchronized tables");
 
   // All good.
   if (unsynced_tables_map.empty() && legacy_tables.empty()) {
@@ -427,6 +443,96 @@ Status CheckHmsMetadata(const RunnerContext& context) {
   return Status::RuntimeError("metadata check tool discovered inconsistent data");
 }
 
+Status FixUnsyncedTables(KuduClient* kudu_client,
+                         HmsCatalog* hms_catalog,
+                         const TablesMap& tables_map) {
+  for (const auto& entry : tables_map) {
+    const KuduTable& kudu_table = *entry.second.first;
+    const vector<hive::Table>& hms_tables = entry.second.second;
+
+    // 1. Create the table in the HMS if there is no corresponding table there.
+    string table_id = entry.first;
+    string kudu_table_name = kudu_table.name();
+    Schema schema = client::SchemaFromKuduSchema(kudu_table.schema());
+    cout << Substitute("Table (ID $0) is out of sync.", table_id) << endl;
+    if (hms_tables.empty()) {
+      RETURN_NOT_OK(hms_catalog->CreateTable(table_id, kudu_table_name,  schema));
+      continue;
+    }
+
+    // 2. If more than one table shares the same table ID in the HMS, return an error
+    //    as it is unsafe to do an automated fix.
+    //
+    if (hms_tables.size() > 1) {
+      auto table_to_string = [] (const hive::Table& hms_table) {
+          return strings::Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
+      };
+      return Status::IllegalState(
+          Substitute("Found more than one tables [$0] in the Hive Metastore, with the "
+                     "same table ID: $1", JoinMapped(hms_tables, table_to_string, ", "),
+                     table_id));
+    }
+
+    // 3. If the table name in Kudu is different from the one in the HMS, correct the
+    //    table name in Kudu with the one in the HMS. Since we consider the HMS as the
+    //    source of truth for table names.
+    hive::Table hms_table = hms_tables[0];
+    string hms_table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
+    if (kudu_table_name != hms_table_name) {
+      string new_table_name;
+      cout << Substitute("Renaming Kudu table $0 [id=$1] to $2 to match the Hive "
+                         "Metastore catalog.", kudu_table_name, table_id, hms_table_name)
+           << endl;
+      RETURN_NOT_OK(AlterKuduTableOnly(kudu_client, kudu_table_name, hms_table_name));
+      kudu_table_name = hms_table_name;
+    }
+
+    // 4. Correct the master addresses, and the column name and type based on the
+    //    information in Kudu.
+    cout << Substitute("Updating metadata of table $0 [id=$1] in Hive Metastore catalog.",
+                       kudu_table_name, table_id) << endl;
+    RETURN_NOT_OK(hms_catalog->AlterTable(table_id, hms_table_name,
+                                          hms_table_name, schema));
+  }
+
+  return Status::OK();
+}
+
+Status FixHmsMetadata(const RunnerContext& context) {
+  const string& master_addresses_str = FindOrDie(context.required_args,
+                                                 kMasterAddressesArg);
+  shared_ptr<KuduClient> kudu_client;
+  unique_ptr<HmsCatalog> hms_catalog;
+  Init(context, &kudu_client, &hms_catalog);
+
+  TablesMap unsynced_tables_map;
+  std::vector<hive::Table> legacy_tables;
+  RETURN_NOT_OK_PREPEND(RetrieveUnsyncedTables(hms_catalog, kudu_client,
+                                               master_addresses_str,
+                                               &unsynced_tables_map,
+                                               &legacy_tables),
+                        "error fetching unsynchronized tables");
+
+  if (unsynced_tables_map.empty() && legacy_tables.empty()) {
+    cout << "Metadata between Kudu and Hive Metastore is in sync." << endl;
+    return Status::OK();
+  }
+
+  // print the legacy tables if any, and returns a runtime error.
+  if (!legacy_tables.empty()) {
+    RETURN_NOT_OK_PREPEND(PrintLegacyTables(legacy_tables, cout),
+                          "error printing legacy tables");
+    return Status::RuntimeError("metadata fix tool encountered fatal errors");
+  }
+
+  // Fix inconsistent metadata.
+  RETURN_NOT_OK_PREPEND(FixUnsyncedTables(kudu_client.get(), hms_catalog.get(),
+                                          unsynced_tables_map),
+                        "error fixing inconsistent metadata");
+  cout << "DONE" << endl;
+  return Status::OK();
+}
+
 unique_ptr<Mode> BuildHmsMode() {
   unique_ptr<Action> hms_upgrade =
       ActionBuilder("upgrade", &HmsUpgrade)
@@ -454,9 +560,22 @@ unique_ptr<Mode> BuildHmsMode() {
           .AddOptionalParameter("hive_metastore_conn_timeout")
           .Build();
 
+  unique_ptr<Action> hms_fix =
+    ActionBuilder("fix", &FixHmsMetadata)
+        .Description("Fix the metadata inconsistency between Kudu and Hive Metastores")
+        .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
+        .AddOptionalParameter("hive_metastore_uris")
+        .AddOptionalParameter("hive_metastore_sasl_enabled")
+        .AddOptionalParameter("hive_metastore_retry_count")
+        .AddOptionalParameter("hive_metastore_send_timeout")
+        .AddOptionalParameter("hive_metastore_recv_timeout")
+        .AddOptionalParameter("hive_metastore_conn_timeout")
+        .Build();
+
   return ModeBuilder("hms").Description("Operate on remote Hive Metastores")
                            .AddAction(std::move(hms_upgrade))
                            .AddAction(std::move(hms_check))
+                           .AddAction(std::move(hms_fix))
                            .Build();
 }
 


Mime
View raw message