kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [2/2] incubator-kudu git commit: master: fix corruption when AlterTable() races with CreateTable()
Date Thu, 14 Jul 2016 21:12:23 GMT
master: fix corruption when AlterTable() races with CreateTable()

Admittedly, this is a contrived scenario:
1. T1 tries to create table with name 'foo'
2. T2 tries to rename table with name 'bar' to 'foo'

With just the right timing, both operations succeed and the metadata now has
two tables named 'foo', each with a different table ID. The fix is simple:
generalize the "tables being created" logic already used by CreateTable().

Without the fix, the new test failed every 50th run or so. With it, it
doesn't fail in 1000 runs.

Change-Id: I6c9e4214c09bc47a5a10b12d6ffe8b35906708c9
Reviewed-on: http://gerrit.cloudera.org:8080/3607
Reviewed-by: Dan Burkert <dan@cloudera.com>
Tested-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/05369115
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/05369115
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/05369115

Branch: refs/heads/master
Commit: 05369115d4c4f73eb5dec53ef155ea5fabdf8c87
Parents: d1d3cfe
Author: Adar Dembo <adar@cloudera.com>
Authored: Fri Jul 8 18:56:23 2016 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Thu Jul 14 21:10:10 2016 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc |  80 ++++++++++------
 src/kudu/master/catalog_manager.h  |  16 +++-
 src/kudu/master/master-test.cc     | 165 ++++++++++++++++++++++++++++++++
 3 files changed, 231 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05369115/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 2d09d82..88bd3e5 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -867,23 +867,25 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
     // b. Verify that the table does not exist.
     table = FindPtrOrNull(table_names_map_, req.name());
     if (table != nullptr) {
-      s = Status::AlreadyPresent("Table already exists", table->id());
+      s = Status::AlreadyPresent(Substitute("Table $0 already exists with id $1",
+                                 req.name(), table->id()));
       SetupError(resp->mutable_error(), MasterErrorPB::TABLE_ALREADY_PRESENT, s);
       return s;
     }
 
-    // c. Mark the table as being created (if it isn't already).
-    if (!InsertIfNotPresent(&tables_being_created_, req.name())) {
-      s = Status::ServiceUnavailable("Table is currently being created", req.name());
+    // c. Reserve the table name if possible.
+    if (!InsertIfNotPresent(&reserved_table_names_, req.name())) {
+      s = Status::ServiceUnavailable(Substitute(
+          "New table name $0 is already reserved", req.name()));
       SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
       return s;
     }
   }
 
-  // Ensure that if we return, we mark this table as no longer being created.
+  // Ensure that we drop the name reservation upon return.
   auto cleanup = MakeScopedCleanup([&] () {
     std::lock_guard<LockType> l(lock_);
-    CHECK_EQ(1, tables_being_created_.erase(req.name()));
+    CHECK_EQ(1, reserved_table_names_.erase(req.name()));
   });
 
   // d. Create the in-memory representation of the new table and its tablets.
@@ -1204,7 +1206,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
 
   RETURN_NOT_OK(CheckOnline());
 
-  // 1. Lookup the table and verify if it exists
+  // 1. Lookup the table and verify if it exists.
   TRACE("Looking up table");
   scoped_refptr<TableInfo> table;
   RETURN_NOT_OK(FindTable(req->table(), &table));
@@ -1222,10 +1224,22 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     return s;
   }
 
+  // 2. Having locked the table, look it up again, in case we raced with
+  //    another AlterTable() that renamed our table.
+  {
+    scoped_refptr<TableInfo> table_again;
+    CHECK_OK(FindTable(req->table(), &table_again));
+    if (table_again == nullptr) {
+      Status s = Status::NotFound("The table does not exist", req->table().DebugString());
+      SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
+      return s;
+    }
+  }
+
   bool has_changes = false;
   string table_name = l.data().name();
 
-  // 2. Calculate new schema for the on-disk state, not persisted yet
+  // 3. Calculate new schema for the on-disk state, not persisted yet.
   Schema new_schema;
   ColumnId next_col_id = ColumnId(l.data().pb.next_column_id());
   if (req->alter_schema_steps_size()) {
@@ -1241,33 +1255,46 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     has_changes = true;
   }
 
-  // 3. Try to acquire the new table name
+  // 4. Try to acquire the new table name.
   if (req->has_new_table_name()) {
     std::lock_guard<LockType> catalog_lock(lock_);
-
     TRACE("Acquired catalog manager lock");
 
-    // Verify that the table does not exist
+    // Verify that the table does not exist.
     scoped_refptr<TableInfo> other_table = FindPtrOrNull(table_names_map_, req->new_table_name());
     if (other_table != nullptr) {
-      Status s = Status::AlreadyPresent("Table already exists", other_table->id());
+      Status s = Status::AlreadyPresent(Substitute("Table $0 already exists with id $1",
+                                                   req->new_table_name(), table->id()));
       SetupError(resp->mutable_error(), MasterErrorPB::TABLE_ALREADY_PRESENT, s);
       return s;
     }
 
-    // Acquire the new table name (now we have 2 name for the same table)
-    table_names_map_[req->new_table_name()] = table;
-    l.mutable_data()->pb.set_name(req->new_table_name());
+    // Reserve the new table name if possible.
+    if (!InsertIfNotPresent(&reserved_table_names_, req->new_table_name())) {
+      Status s = Status::ServiceUnavailable(Substitute(
+          "Table name $0 is already reserved", req->new_table_name()));
+      SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
+      return s;
+    }
 
+    l.mutable_data()->pb.set_name(req->new_table_name());
     has_changes = true;
   }
 
+  // Ensure that we drop our reservation upon return.
+  auto cleanup = MakeScopedCleanup([&] () {
+    if (req->has_new_table_name()) {
+      std::lock_guard<LockType> l(lock_);
+      CHECK_EQ(1, reserved_table_names_.erase(req->new_table_name()));
+    }
+  });
+
   // Skip empty requests...
   if (!has_changes) {
     return Status::OK();
   }
 
-  // 4. Serialize the schema Increment the version number
+  // 5. Serialize the schema Increment the version number.
   if (new_schema.initialized()) {
     if (!l.data().pb.has_fully_applied_schema()) {
       l.mutable_data()->pb.mutable_fully_applied_schema()->CopyFrom(l.data().pb.schema());
@@ -1281,7 +1308,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
                                          l.mutable_data()->pb.version(),
                                          LocalTimeAsString()));
 
-  // 5. Update sys-catalog with the new table schema.
+  // 6. Update sys-catalog with the new table schema.
   TRACE("Updating metadata on disk");
   SysCatalogTable::Actions actions;
   actions.table_to_update = table.get();
@@ -1291,24 +1318,23 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
         Substitute("An error occurred while updating sys-catalog tables entry: $0",
                    s.ToString()));
     LOG(WARNING) << s.ToString();
-    if (req->has_new_table_name()) {
-      std::lock_guard<LockType> catalog_lock(lock_);
-      CHECK_EQ(table_names_map_.erase(req->new_table_name()), 1);
-    }
     CheckIfNoLongerLeaderAndSetupError(s, resp);
     return s;
   }
 
-  // 6. Remove the old name
+  // 7. Remove the old name and add the new name.
   if (req->has_new_table_name()) {
-    TRACE("Removing old-name $0 from by-name map", table_name);
+    TRACE("Replacing name $0 with $1 in by-name table map",
+          table_name, req->new_table_name());
     std::lock_guard<LockType> l_map(lock_);
     if (table_names_map_.erase(table_name) != 1) {
-      PANIC_RPC(rpc, "Could not remove table from map, name=" + l.data().name());
+      PANIC_RPC(rpc, Substitute(
+          "Could not remove table (name $0) from map", table_name));
     }
+    InsertOrDie(&table_names_map_, req->new_table_name(), table);
   }
 
-  // 7. Update the in-memory state
+  // 8. Update the in-memory state.
   TRACE("Committing in-memory state");
   l.Commit();
 
@@ -2736,8 +2762,8 @@ Status CatalogManager::ProcessPendingAssignments(
     s = SelectReplicasForTablet(ts_descs, tablet);
     if (!s.ok()) {
       s = s.CloneAndPrepend(Substitute(
-          "An error occured while selecting replicas for tablet $0: $1",
-          tablet->tablet_id(), s.ToString()));
+          "An error occured while selecting replicas for tablet $0",
+          tablet->tablet_id()));
       break;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05369115/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index d6257b4..44f84c0 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -654,9 +654,19 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // Tablet maps: tablet-id -> TabletInfo
   TabletInfoMap tablet_map_;
 
-  // Names of tables that are currently being created. Only used in
-  // table creation so that transient tables are not made visible.
-  std::unordered_set<std::string> tables_being_created_;
+  // Names of tables that are currently reserved by CreateTable() or
+  // AlterTable().
+  //
+  // As a rule, operations that add new table names should do so as follows:
+  // 1. Acquire lock_.
+  // 2. Ensure table_names_map_ does not contain the new name.
+  // 3. Ensure reserved_table_names_ does not contain the new name.
+  // 4. Add the new name to reserved_table_names_.
+  // 5. Release lock_.
+  // 6. Perform the operation.
+  // 7. If it succeeded, add the name to table_names_map_ with lock_ held.
+  // 8. Remove the new name from reserved_table_names_ with lock_ held.
+  std::unordered_set<std::string> reserved_table_names_;
 
   Master *master_;
   Atomic32 closing_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05369115/src/kudu/master/master-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 015f2f8..87fb301 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -1027,5 +1027,170 @@ TEST_F(MasterTest, TestConcurrentCreateOfSameTable) {
   }
 }
 
+TEST_F(MasterTest, TestConcurrentRenameOfSameTable) {
+  const char* kOldName = "testtb";
+  const char* kNewName = "testtb-new";
+  const Schema kTableSchema({ ColumnSchema("key", INT32),
+                              ColumnSchema("v1", UINT64),
+                              ColumnSchema("v2", STRING) },
+                            1);
+  ASSERT_OK(CreateTable(kOldName, kTableSchema));
+
+  // Kick off a bunch of threads all trying to rename the same table.
+  vector<thread> threads;
+  for (int i = 0; i < 10; i++) {
+    threads.emplace_back([&]() {
+      AlterTableRequestPB req;
+      AlterTableResponsePB resp;
+      RpcController controller;
+
+      req.mutable_table()->set_table_name(kOldName);
+      req.set_new_table_name(kNewName);
+      CHECK_OK(proxy_->AlterTable(req, &resp, &controller));
+      SCOPED_TRACE(resp.DebugString());
+
+      // There are two expected outcomes:
+      //
+      // 1. This thread won the AlterTable() race: no error.
+      // 2. This thread lost the AlterTable() race: TABLE_NOT_FOUND error
+      //    with NotFound status.
+      if (resp.has_error()) {
+        Status s = StatusFromPB(resp.error().status());
+        string failure_msg = Substitute("Unexpected response: $0",
+                                        resp.DebugString());
+        CHECK_EQ(MasterErrorPB::TABLE_NOT_FOUND, resp.error().code()) << failure_msg;
+        CHECK(s.IsNotFound()) << failure_msg;
+      }
+    });
+  }
+
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
+TEST_F(MasterTest, TestConcurrentCreateAndRenameOfSameTable) {
+  const char* kOldName = "testtb";
+  const char* kNewName = "testtb-new";
+  const Schema kTableSchema({ ColumnSchema("key", INT32),
+                              ColumnSchema("v1", UINT64),
+                              ColumnSchema("v2", STRING) },
+                            1);
+  ASSERT_OK(CreateTable(kOldName, kTableSchema));
+
+  AtomicBool create_success(false);
+  AtomicBool rename_success(false);
+  vector<thread> threads;
+  for (int i = 0; i < 10; i++) {
+    if (i % 2) {
+      threads.emplace_back([&]() {
+        CreateTableRequestPB req;
+        CreateTableResponsePB resp;
+        RpcController controller;
+
+        req.set_name(kNewName);
+        RowOperationsPBEncoder encoder(req.mutable_split_rows_range_bounds());
+
+        KuduPartialRow split1(&kTableSchema);
+        CHECK_OK(split1.SetInt32("key", 10));
+        encoder.Add(RowOperationsPB::SPLIT_ROW, split1);
+
+        KuduPartialRow split2(&kTableSchema);
+        CHECK_OK(split2.SetInt32("key", 20));
+        encoder.Add(RowOperationsPB::SPLIT_ROW, split2);
+
+        CHECK_OK(SchemaToPB(kTableSchema, req.mutable_schema()));
+        CHECK_OK(proxy_->CreateTable(req, &resp, &controller));
+        SCOPED_TRACE(resp.DebugString());
+
+        // There are three expected outcomes:
+        //
+        // 1. This thread finished well before the others: no error.
+        // 2. This thread raced with another thread: TABLE_NOT_FOUND error with
+        //    ServiceUnavailable status.
+        // 3. This thread finished well after the others: TABLE_ALREADY_PRESENT
+        //    error with AlreadyPresent status.
+        if (resp.has_error()) {
+          Status s = StatusFromPB(resp.error().status());
+          string failure_msg = Substitute("Unexpected response: $0",
+                                          resp.DebugString());
+          switch (resp.error().code()) {
+            case MasterErrorPB::TABLE_NOT_FOUND:
+              CHECK(s.IsServiceUnavailable()) << failure_msg;
+              break;
+            case MasterErrorPB::TABLE_ALREADY_PRESENT:
+              CHECK(s.IsAlreadyPresent()) << failure_msg;
+              break;
+            default:
+              FAIL() << failure_msg;
+          }
+        } else {
+          // Creating the table should only succeed once.
+          CHECK(!create_success.Exchange(true));
+        }
+      });
+    } else {
+      threads.emplace_back([&]() {
+        AlterTableRequestPB req;
+        AlterTableResponsePB resp;
+        RpcController controller;
+
+        req.mutable_table()->set_table_name(kOldName);
+        req.set_new_table_name(kNewName);
+        CHECK_OK(proxy_->AlterTable(req, &resp, &controller));
+        SCOPED_TRACE(resp.DebugString());
+
+        // There are three expected outcomes:
+        //
+        // 1. This thread finished well before the others: no error.
+        // 2. This thread raced with CreateTable(): TABLE_NOT_FOUND error with
+        //    ServiceUnavailable status (if raced during reservation stage)
+        //    or TABLE_ALREADY_PRESENT error with AlreadyPresent status (if
+        //    raced after reservation stage).
+        // 3. This thread raced with AlterTable() or finished well after the
+        //    others: TABLE_NOT_FOUND error with NotFound status.
+        if (resp.has_error()) {
+          Status s = StatusFromPB(resp.error().status());
+          string failure_msg = Substitute("Unexpected response: $0",
+                                          resp.DebugString());
+          switch (resp.error().code()) {
+            case MasterErrorPB::TABLE_NOT_FOUND:
+              CHECK(s.IsServiceUnavailable() || s.IsNotFound()) << failure_msg;
+              break;
+            case MasterErrorPB::TABLE_ALREADY_PRESENT:
+              CHECK(s.IsAlreadyPresent()) << failure_msg;
+              break;
+            default:
+              FAIL() << failure_msg;
+          }
+        } else {
+          // Renaming the table should only succeed once.
+          CHECK(!rename_success.Exchange(true));
+        }
+      });
+    }
+  }
+
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  // At least one of rename or create should have failed; if both succeeded
+  // there must be some sort of race.
+  CHECK(!rename_success.Load() || !create_success.Load());
+
+  unordered_set<string> live_tables;
+  live_tables.insert(kNewName);
+  if (create_success.Load()) {
+    live_tables.insert(kOldName);
+  }
+  MasterMetadataVerifier verifier(live_tables, {});
+  SysCatalogTable* sys_catalog =
+      mini_master_->master()->catalog_manager()->sys_catalog();
+  ASSERT_OK(sys_catalog->VisitTables(&verifier));
+  ASSERT_OK(sys_catalog->VisitTablets(&verifier));
+  ASSERT_OK(verifier.Verify());
+}
+
 } // namespace master
 } // namespace kudu


Mime
View raw message