kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [4/9] kudu git commit: KUDU-1311 [master] support adding and dropping range partitions
Date Mon, 01 Aug 2016 22:11:41 GMT
http://git-wip-us.apache.org/repos/asf/kudu/blob/232474a5/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 7642ddd..f807661 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1122,10 +1122,10 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
   return Status::OK();
 }
 
-static Status ApplyAlterSteps(const SysTablesEntryPB& current_pb,
-                              const AlterTableRequestPB* req,
-                              Schema* new_schema,
-                              ColumnId* next_col_id) {
+Status CatalogManager::ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb,
+                                             vector<AlterTableRequestPB::Step> steps,
+                                             Schema* new_schema,
+                                             ColumnId* next_col_id) {
   const SchemaPB& current_schema_pb = current_pb.schema();
   Schema cur_schema;
   RETURN_NOT_OK(SchemaFromPB(current_schema_pb, &cur_schema));
@@ -1135,7 +1135,7 @@ static Status ApplyAlterSteps(const SysTablesEntryPB& current_pb,
     builder.set_next_column_id(ColumnId(current_pb.next_column_id()));
   }
 
-  for (const AlterTableRequestPB::Step& step : req->alter_schema_steps()) {
+  for (const auto& step : steps) {
     switch (step.type()) {
       case AlterTableRequestPB::ADD_COLUMN: {
         if (!step.has_add_column()) {
@@ -1197,8 +1197,7 @@ static Status ApplyAlterSteps(const SysTablesEntryPB& current_pb,
       // TODO: EDIT_COLUMN
 
       default: {
-        return Status::InvalidArgument(
-          Substitute("Invalid alter step type: $0", step.type()));
+        return Status::InvalidArgument("Invalid alter schema step type", step.DebugString());
       }
     }
   }
@@ -1207,6 +1206,150 @@ static Status ApplyAlterSteps(const SysTablesEntryPB& current_pb,
   return Status::OK();
 }
 
+Status CatalogManager::ApplyAlterPartitioningSteps(
+    const TableMetadataLock& l,
+    TableInfo* table,
+    const Schema& client_schema,
+    vector<AlterTableRequestPB::Step> steps,
+    vector<scoped_refptr<TabletInfo>>* tablets_to_add,
+    vector<scoped_refptr<TabletInfo>>* tablets_to_drop) {
+
+  Schema schema;
+  RETURN_NOT_OK(SchemaFromPB(l.data().pb.schema(), &schema));
+  PartitionSchema partition_schema;
+  RETURN_NOT_OK(PartitionSchema::FromPB(l.data().pb.partition_schema(), schema, &partition_schema));
+
+  map<string, TabletInfo*> existing_tablets = table->tablet_map();
+  map<string, scoped_refptr<TabletInfo>> new_tablets;
+
+  for (const auto& step : steps) {
+    vector<DecodedRowOperation> ops;
+    if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION) {
+      RowOperationsPBDecoder decoder(&step.add_range_partition().range_bounds(),
+                                     &client_schema, &schema, nullptr);
+      RETURN_NOT_OK(decoder.DecodeOperations(&ops));
+    } else {
+      CHECK_EQ(step.type(), AlterTableRequestPB::DROP_RANGE_PARTITION);
+      RowOperationsPBDecoder decoder(&step.drop_range_partition().range_bounds(),
+                                     &client_schema, &schema, nullptr);
+      RETURN_NOT_OK(decoder.DecodeOperations(&ops));
+    }
+
+    if (ops.size() != 2) {
+      return Status::InvalidArgument("expected two row operations for alter range partition
step",
+                                     step.ShortDebugString());
+    }
+
+    if (ops[0].type != RowOperationsPB::RANGE_LOWER_BOUND ||
+        ops[1].type != RowOperationsPB::RANGE_UPPER_BOUND) {
+      return Status::InvalidArgument(
+          "expected a lower bound and upper bound row op for alter range partition step",
+          strings::Substitute("$0, $1", ops[0].ToString(schema), ops[1].ToString(schema)));
+    }
+
+    vector<Partition> partitions;
+    RETURN_NOT_OK(partition_schema.CreatePartitions({}, {{ *ops[0].split_row, *ops[1].split_row
}},
+                                                    schema, &partitions));
+
+    switch (step.type()) {
+      case AlterTableRequestPB::ADD_RANGE_PARTITION: {
+        for (const Partition& partition : partitions) {
+          const string& lower_bound = partition.partition_key_start();
+          const string& upper_bound = partition.partition_key_end();
+
+          // Check that the new tablet doesn't overlap with the existing tablets.
+          // Iter points at the tablet directly *after* the lower bound (or to
+          // existing_tablets.end(), if such a tablet does not exist).
+          auto existing_iter = existing_tablets.upper_bound(lower_bound);
+          if (existing_iter != existing_tablets.end()) {
+            TabletMetadataLock metadata(existing_iter->second, TabletMetadataLock::READ);
+            if (metadata.data().pb.partition().partition_key_start() < upper_bound) {
+              return Status::NotFound("New partition conflicts with existing partition",
+                                      step.ShortDebugString());
+            }
+          }
+          if (existing_iter != existing_tablets.begin()) {
+            TabletMetadataLock metadata(std::prev(existing_iter)->second, TabletMetadataLock::READ);
+            if (metadata.data().pb.partition().partition_key_end() > lower_bound) {
+              return Status::NotFound("New partition conflicts with existing partition",
+                                      step.ShortDebugString());
+            }
+          }
+
+          // Check that the new tablet doesn't overlap with any other new tablets.
+          auto new_iter = new_tablets.upper_bound(lower_bound);
+          if (new_iter != new_tablets.end()) {
+            const auto& metadata = new_iter->second->mutable_metadata()->dirty();
+            if (metadata.pb.partition().partition_key_start() < upper_bound) {
+              return Status::NotFound("New partition conflicts with another new partition",
+                                      step.ShortDebugString());
+            }
+          }
+          if (new_iter != new_tablets.begin()) {
+            const auto& metadata = std::prev(new_iter)->second->mutable_metadata()->dirty();
+            if (metadata.pb.partition().partition_key_end() > lower_bound) {
+              return Status::NotFound("New partition conflicts with another new partition",
+                                      step.ShortDebugString());
+            }
+          }
+
+          PartitionPB partition_pb;
+          partition.ToPB(&partition_pb);
+          new_tablets.emplace(lower_bound, CreateTabletInfo(table, partition_pb));
+        }
+        break;
+      }
+
+      case AlterTableRequestPB::DROP_RANGE_PARTITION: {
+        for (const Partition& partition : partitions) {
+          const string& lower_bound = partition.partition_key_start();
+          const string& upper_bound = partition.partition_key_end();
+
+          // Iter points to the tablet if it exists, or the next tablet, or the end.
+          auto existing_iter = existing_tablets.lower_bound(lower_bound);
+          auto new_iter = new_tablets.lower_bound(lower_bound);
+
+          bool found_existing = false;
+          bool found_new = false;
+
+          if (existing_iter != existing_tablets.end()) {
+            TabletMetadataLock metadata(existing_iter->second, TabletMetadataLock::READ);
+            const auto& partition = metadata.data().pb.partition();
+            found_existing = partition.partition_key_start() == lower_bound ||
+                             partition.partition_key_end() == upper_bound;
+          }
+          if (new_iter != new_tablets.end()) {
+            const auto& partition = new_iter->second->mutable_metadata()->dirty().pb.partition();
+            found_new = partition.partition_key_start() == lower_bound ||
+                        partition.partition_key_end() == upper_bound;
+          }
+
+          DCHECK(!found_existing || !found_new);
+          if (found_existing) {
+            tablets_to_drop->emplace_back(existing_iter->second);
+            existing_tablets.erase(existing_iter);
+          } else if (found_new) {
+            new_tablets.erase(new_iter);
+          } else {
+            return Status::NotFound("No tablet found for drop partition step",
+                                    step.ShortDebugString());
+          }
+        }
+        break;
+      }
+      default: {
+        return Status::InvalidArgument("Unknown alter table partitioning step",
+                                       step.ShortDebugString());
+      }
+    }
+  }
+
+  for (auto& tablet : new_tablets) {
+    tablets_to_add->emplace_back(std::move(tablet.second));
+  }
+  return Status::OK();
+}
+
 Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
                                   AlterTableResponsePB* resp,
                                   rpc::RpcContext* rpc) {
@@ -1216,7 +1359,32 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
   LOG(INFO) << "Servicing AlterTable request from " << RequestorString(rpc)
             << ": " << req->ShortDebugString();
 
-  // 1. Lookup the table and verify if it exists.
+  RETURN_NOT_OK(CheckOnline());
+
+  // 1. Group the steps into schema altering steps and partition altering steps.
+  vector<AlterTableRequestPB::Step> alter_schema_steps;
+  vector<AlterTableRequestPB::Step> alter_partitioning_steps;
+  for (const auto& step : req->alter_schema_steps()) {
+    switch (step.type()) {
+      case AlterTableRequestPB::ADD_COLUMN:
+      case AlterTableRequestPB::DROP_COLUMN:
+      case AlterTableRequestPB::RENAME_COLUMN: {
+        alter_schema_steps.emplace_back(step);
+        break;
+      }
+      case AlterTableRequestPB::ADD_RANGE_PARTITION:
+      case AlterTableRequestPB::DROP_RANGE_PARTITION: {
+        alter_partitioning_steps.emplace_back(step);
+        break;
+      }
+      case AlterTableRequestPB::ALTER_COLUMN:
+      case AlterTableRequestPB::UNKNOWN: {
+        return Status::InvalidArgument("Invalid alter step type", step.ShortDebugString());
+      }
+    }
+  }
+
+  // 2. Lookup the table, verify if it exists, and lock it for modification.
   TRACE("Looking up table");
   scoped_refptr<TableInfo> table;
   RETURN_NOT_OK(FindTable(req->table(), &table));
@@ -1234,8 +1402,8 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     return s;
   }
 
-  // 2. Having locked the table, look it up again, in case we raced with
-  //    another AlterTable() that renamed our table.
+  // 3. Having locked the table, look it up again, in case we raced with another
+  //    AlterTable() that renamed our table.
   {
     scoped_refptr<TableInfo> table_again;
     CHECK_OK(FindTable(req->table(), &table_again));
@@ -1246,15 +1414,14 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     }
   }
 
-  bool has_changes = false;
   string table_name = l.data().name();
 
-  // 3. Calculate new schema for the on-disk state, not persisted yet.
+  // 4. Calculate new schema for the on-disk state, not persisted yet.
   Schema new_schema;
   ColumnId next_col_id = ColumnId(l.data().pb.next_column_id());
-  if (req->alter_schema_steps_size()) {
+  if (!alter_schema_steps.empty()) {
     TRACE("Apply alter schema");
-    Status s = ApplyAlterSteps(l.data().pb, req, &new_schema, &next_col_id);
+    Status s = ApplyAlterSchemaSteps(l.data().pb, alter_schema_steps, &new_schema, &next_col_id);
     if (!s.ok()) {
       SetupError(resp->mutable_error(), MasterErrorPB::INVALID_SCHEMA, s);
       return s;
@@ -1262,10 +1429,9 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     DCHECK_NE(next_col_id, 0);
     DCHECK_EQ(new_schema.find_column_by_id(next_col_id),
               static_cast<int>(Schema::kColumnNotFound));
-    has_changes = true;
   }
 
-  // 4. Try to acquire the new table name.
+  // 5. Try to acquire the new table name.
   if (req->has_new_table_name()) {
     std::lock_guard<LockType> catalog_lock(lock_);
     TRACE("Acquired catalog manager lock");
@@ -1288,7 +1454,6 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     }
 
     l.mutable_data()->pb.set_name(req->new_table_name());
-    has_changes = true;
   }
 
   // Ensure that we drop our reservation upon return.
@@ -1299,29 +1464,80 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     }
   });
 
+  // 6. Alter table partitioning.
+  vector<scoped_refptr<TabletInfo>> tablets_to_add;
+  vector<scoped_refptr<TabletInfo>> tablets_to_drop;
+  if (!alter_partitioning_steps.empty()) {
+    TRACE("Apply alter partitioning");
+    Schema client_schema;
+    RETURN_NOT_OK(SchemaFromPB(req->schema(), &client_schema));
+    Status s = ApplyAlterPartitioningSteps(l, table.get(), client_schema, alter_partitioning_steps,
+                                           &tablets_to_add, &tablets_to_drop);
+    if (!s.ok()) {
+      SetupError(resp->mutable_error(), MasterErrorPB::UNKNOWN_ERROR, s);
+      return s;
+    }
+  }
+
+  // Set to true if columns are altered, added or dropped.
+  bool has_schema_changes = !alter_schema_steps.empty();
+  // Set to true if there are schema changes, or the table is renamed.
+  bool has_metadata_changes = has_schema_changes || req->has_new_table_name();
+  // Set to true if there are partitioning changes.
+  bool has_partitioning_changes = !alter_partitioning_steps.empty();
+  // Set to true if metadata changes need to be applied to existing tablets.
+  bool has_metadata_changes_for_existing_tablets =
+    has_metadata_changes && table->num_tablets() > tablets_to_drop.size();
+
   // Skip empty requests...
-  if (!has_changes) {
+  if (!has_metadata_changes && !has_partitioning_changes) {
     return Status::OK();
   }
 
-  // 5. Serialize the schema Increment the version number.
-  if (new_schema.initialized()) {
-    if (!l.data().pb.has_fully_applied_schema()) {
-      l.mutable_data()->pb.mutable_fully_applied_schema()->CopyFrom(l.data().pb.schema());
-    }
+  // 7. Serialize the schema and increment the version number.
+  if (has_metadata_changes_for_existing_tablets && !l.data().pb.has_fully_applied_schema())
{
+    l.mutable_data()->pb.mutable_fully_applied_schema()->CopyFrom(l.data().pb.schema());
+  }
+  if (has_schema_changes) {
     CHECK_OK(SchemaToPB(new_schema, l.mutable_data()->pb.mutable_schema()));
   }
-  l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
-  l.mutable_data()->pb.set_next_column_id(next_col_id);
-  l.mutable_data()->set_state(SysTablesEntryPB::ALTERING,
-                              Substitute("Alter Table version=$0 ts=$1",
-                                         l.mutable_data()->pb.version(),
-                                         LocalTimeAsString()));
+  if (has_metadata_changes) {
+    l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
+    l.mutable_data()->pb.set_next_column_id(next_col_id);
+  }
+  if (!tablets_to_add.empty() || has_metadata_changes_for_existing_tablets) {
+    // If some tablet schemas need to be updated or there are any new tablets,
+    // set the table state to ALTERING, so that IsAlterTableDone RPCs will wait
+    // for the schema updates and tablets to be running.
+    l.mutable_data()->set_state(SysTablesEntryPB::ALTERING,
+                                Substitute("Alter Table version=$0 ts=$1",
+                                           l.mutable_data()->pb.version(),
+                                           LocalTimeAsString()));
+  }
 
-  // 6. Update sys-catalog with the new table schema.
+  // 8. Update sys-catalog with the new table schema and tablets to add/drop.
   TRACE("Updating metadata on disk");
+  string deletion_msg = "Partition dropped at " + LocalTimeAsString();
   SysCatalogTable::Actions actions;
-  actions.table_to_update = table.get();
+  if (!tablets_to_add.empty() || has_metadata_changes) {
+    // If anything modified the table's persistent metadata, then sync it to the sys catalog.
+    actions.table_to_update = table.get();
+  }
+  for (const auto& tablet : tablets_to_add) {
+    actions.tablets_to_add.push_back(tablet.get());
+  }
+
+  ScopedTabletInfoCommitter tablets_to_add_committer(ScopedTabletInfoCommitter::LOCKED);
+  ScopedTabletInfoCommitter tablets_to_drop_committer(ScopedTabletInfoCommitter::UNLOCKED);
+  tablets_to_add_committer.AddTablets(tablets_to_add);
+  tablets_to_drop_committer.AddTablets(tablets_to_drop);
+  tablets_to_drop_committer.LockTabletsForWriting();
+  for (auto& tablet : tablets_to_drop) {
+    tablet->mutable_metadata()->mutable_dirty()->set_state(SysTabletsEntryPB::DELETED,
+                                                           deletion_msg);
+    actions.tablets_to_update.push_back(tablet.get());
+  }
+
   Status s = sys_catalog_->Write(actions);
   if (!s.ok()) {
     s = s.CloneAndPrepend(
@@ -1329,26 +1545,65 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
                    s.ToString()));
     LOG(WARNING) << s.ToString();
     CheckIfNoLongerLeaderAndSetupError(s, resp);
+    tablets_to_add_committer.Abort();
+    tablets_to_drop_committer.Abort();
     return s;
   }
 
-  // 7. Remove the old name and add the new name.
-  if (req->has_new_table_name()) {
-    TRACE("Replacing name $0 with $1 in by-name table map",
-          table_name, req->new_table_name());
-    std::lock_guard<LockType> l_map(lock_);
-    if (table_names_map_.erase(table_name) != 1) {
-      PANIC_RPC(rpc, Substitute(
-          "Could not remove table (name $0) from map", table_name));
+  // 9. Commit the in-memory state.
+  {
+    TRACE("Committing alterations to in-memory state");
+    // Commit new tablet in-memory state. This doesn't require taking the global
+    // lock since the new tablets are not yet visible, because they haven't been
+    // added to the table or tablet index.
+    tablets_to_add_committer.Commit();
+
+    // Take the global catalog manager lock in order to modify the global table
+    // and tablets indices.
+    std::lock_guard<LockType> lock(lock_);
+    if (req->has_new_table_name()) {
+      if (table_names_map_.erase(table_name) != 1) {
+        PANIC_RPC(rpc, Substitute(
+            "Could not remove table (name $0) from map", table_name));
+      }
+      InsertOrDie(&table_names_map_, req->new_table_name(), table);
+    }
+
+    // Insert new tablets into the global tablet map. After this, the tablets
+    // will be visible in GetTabletLocations RPCs.
+    for (const auto& tablet : tablets_to_add) {
+      InsertOrDie(&tablet_map_, tablet->tablet_id(), std::move(tablet));
     }
-    InsertOrDie(&table_names_map_, req->new_table_name(), table);
   }
 
-  // 8. Update the in-memory state.
-  TRACE("Committing in-memory state");
-  l.Commit();
+  // Add and remove new tablets from the table. This makes the tablets visible
+  // to GetTableLocations RPCs. This doesn't need to happen under the global
+  // lock, since:
+  //  * clients can not know the new tablet IDs, so GetTabletLocations RPCs
+  //    are impossible.
+  //  * the new tablets can not heartbeat yet, since they don't get created
+  //    until further down.
+  table->AddRemoveTablets(tablets_to_add, tablets_to_drop);
+
+  // Commit state change for dropped tablets. This comes after removing the
+  // tablets from their associated tables so that if a GetTableLocations or
+  // GetTabletLocations returns a deleted tablet, the retry will never include
+  // the tablet again.
+  tablets_to_drop_committer.Commit();
+
+  if (!tablets_to_add.empty() || has_metadata_changes) {
+    l.Commit();
+  } else {
+    l.Unlock();
+  }
 
   SendAlterTableRequest(table);
+  for (const auto& tablet : tablets_to_drop) {
+    TabletMetadataLock l(tablet.get(), TabletMetadataLock::READ);
+    SendDeleteTabletRequest(tablet, l, deletion_msg);
+  }
+
+  background_tasks_->Wake();
   return Status::OK();
 }
 
@@ -1404,7 +1659,7 @@ Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB*
req,
   if (l.data().pb.has_fully_applied_schema()) {
     // An AlterTable is in progress; fully_applied_schema is the last
     // schema that has reached every TS.
-    CHECK(l.data().pb.state() == SysTablesEntryPB::ALTERING);
+    CHECK_EQ(SysTablesEntryPB::ALTERING, l.data().pb.state());
     resp->mutable_schema()->CopyFrom(l.data().pb.fully_applied_schema());
   } else {
     // There's no AlterTable, the regular schema is "fully applied".
@@ -2645,7 +2900,7 @@ void CatalogManager::HandleAssignCreatingTablet(TabletInfo* tablet,
   // The "tablet creation" was already sent, but we didn't receive an answer
   // within the timeout. So the tablet will be replaced by a new one.
   scoped_refptr<TabletInfo> replacement = CreateTabletInfo(tablet->table().get(),
-                                             old_info.pb.partition());
+                                                           old_info.pb.partition());
   LOG(WARNING) << "Tablet " << tablet->ToString() << " was not created
within "
                << "the allowed timeout. Replacing with a new tablet "
                << replacement->tablet_id();
@@ -3076,14 +3331,18 @@ Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB*
req,
     if (s.ok()) {
       continue;
     } else if (s.IsNotFound()) {
-      // The tablet has been deleted; filter it from the results.
-      resp->mutable_tablet_locations()->RemoveLast();
-    } else if (s.IsServiceUnavailable()) {
-      // The tablet is not yet running; fail the request.
+      // The tablet has been deleted; force the client to retry. This is a
+      // transient state that only happens with a concurrent drop range
+      // partition alter table operation.
       resp->Clear();
       resp->mutable_error()->set_code(MasterErrorPB_Code::MasterErrorPB_Code_TABLET_NOT_RUNNING);
       StatusToPB(Status::ServiceUnavailable("Tablet not running"),
                  resp->mutable_error()->mutable_status());
+    } else if (s.IsServiceUnavailable()) {
+      // The tablet is not yet running; fail the request.
+      resp->Clear();
+      resp->mutable_error()->set_code(MasterErrorPB_Code::MasterErrorPB_Code_TABLET_NOT_RUNNING);
+      StatusToPB(s, resp->mutable_error()->mutable_status());
       break;
     } else {
       LOG(FATAL) << "Unexpected error while building tablet locations: " << s.ToString();
@@ -3325,7 +3584,7 @@ std::string TableInfo::ToString() const {
 
 bool TableInfo::RemoveTablet(const std::string& partition_key_start) {
   std::lock_guard<simple_spinlock> l(lock_);
-  return EraseKeyReturnValuePtr(&tablet_map_, partition_key_start) != NULL;
+  return EraseKeyReturnValuePtr(&tablet_map_, partition_key_start) != nullptr;
 }
 
 void TableInfo::AddTablet(TabletInfo *tablet) {
@@ -3340,6 +3599,18 @@ void TableInfo::AddTablets(const vector<TabletInfo*>& tablets)
{
   }
 }
 
+void TableInfo::AddRemoveTablets(const vector<scoped_refptr<TabletInfo>>&
tablets_to_add,
+                                 const vector<scoped_refptr<TabletInfo>>&
tablets_to_drop) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  for (const auto& tablet : tablets_to_drop) {
+    const auto& lower_bound = tablet->metadata().state().pb.partition().partition_key_start();
+    CHECK(EraseKeyReturnValuePtr(&tablet_map_, lower_bound) != nullptr);
+  }
+  for (const auto& tablet : tablets_to_add) {
+    AddTabletUnlocked(tablet.get());
+  }
+}
+
 void TableInfo::AddTabletUnlocked(TabletInfo* tablet) {
   TabletInfo* old = nullptr;
   if (UpdateReturnCopy(&tablet_map_,
@@ -3454,7 +3725,7 @@ void TableInfo::GetTaskList(std::vector<scoped_refptr<MonitoredTask>
> *ret) {
 void TableInfo::GetAllTablets(vector<scoped_refptr<TabletInfo> > *ret) const
{
   ret->clear();
   std::lock_guard<simple_spinlock> l(lock_);
-  for (const TableInfo::TabletInfoMap::value_type& e : tablet_map_) {
+  for (const auto& e : tablet_map_) {
     ret->push_back(make_scoped_refptr(e.second));
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/232474a5/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 88a941b..e348d98 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -179,6 +179,7 @@ struct PersistentTableInfo {
 class TableInfo : public RefCountedThreadSafe<TableInfo> {
  public:
   typedef PersistentTableInfo cow_state;
+  typedef std::map<std::string, TabletInfo*> TabletInfoMap;
 
   explicit TableInfo(std::string table_id);
 
@@ -192,6 +193,10 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
   // Add multiple tablets to this table.
   void AddTablets(const std::vector<TabletInfo*>& tablets);
 
+  // Atomically add and remove multiple tablets from this table.
+  void AddRemoveTablets(const vector<scoped_refptr<TabletInfo>>& tablets_to_add,
+                        const vector<scoped_refptr<TabletInfo>>& tablets_to_drop);
+
   // Return true if tablet with 'partition_key_start' has been
   // removed from 'tablet_map_' below.
   bool RemoveTablet(const std::string& partition_key_start);
@@ -200,6 +205,7 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
   void GetTabletsInRange(const GetTableLocationsRequestPB* req,
                          std::vector<scoped_refptr<TabletInfo> > *ret) const;
 
+  // Adds all tablets to the vector in partition key sorted order.
   void GetAllTablets(std::vector<scoped_refptr<TabletInfo> > *ret) const;
 
   // Access the persistent metadata. Typically you should use
@@ -221,6 +227,18 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
   // Allow for showing outstanding tasks in the master UI.
   void GetTaskList(std::vector<scoped_refptr<MonitoredTask> > *tasks);
 
+  // Returns a snapshot copy of the table info's tablet map.
+  TabletInfoMap tablet_map() const {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return tablet_map_;
+  }
+
+  // Returns the number of tablets.
+  int num_tablets() const {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return tablet_map_.size();
+  }
+
  private:
   friend class RefCountedThreadSafe<TableInfo>;
   ~TableInfo();
@@ -231,7 +249,6 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
 
   // Sorted index of tablet start partition-keys to TabletInfo.
   // The TabletInfo objects are owned by the CatalogManager.
-  typedef std::map<std::string, TabletInfo *> TabletInfoMap;
   TabletInfoMap tablet_map_;
 
   // Protects tablet_map_ and pending_tasks_
@@ -570,6 +587,18 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // Extract the set of tablets that must be processed because not running yet.
   void ExtractTabletsToProcess(std::vector<scoped_refptr<TabletInfo>>* tablets_to_process);
 
+  Status ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb,
+                               std::vector<AlterTableRequestPB::Step> steps,
+                               Schema* new_schema,
+                               ColumnId* next_col_id);
+
+  Status ApplyAlterPartitioningSteps(const TableMetadataLock& l,
+                                     TableInfo* table,
+                                     const Schema& client_schema,
+                                     std::vector<AlterTableRequestPB::Step> steps,
+                                     std::vector<scoped_refptr<TabletInfo>>*
tablets_to_add,
+                                     std::vector<scoped_refptr<TabletInfo>>*
tablets_to_drop);
+
   // Task that takes care of the tablet assignments/creations.
   // Loops through the "not created" tablets and sends a CreateTablet() request.
   Status ProcessPendingAssignments(const std::vector<scoped_refptr<TabletInfo> >&
tablets);

http://git-wip-us.apache.org/repos/asf/kudu/blob/232474a5/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index a5292e1..cae4dae 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -433,6 +433,8 @@ message AlterTableRequestPB {
     // TODO(KUDU-861): this will subsume RENAME_COLUMN, but not yet implemented
     // on the master side.
     ALTER_COLUMN = 4;
+    ADD_RANGE_PARTITION = 5;
+    DROP_RANGE_PARTITION = 6;
   }
   message AddColumn {
     // The schema to add.
@@ -449,6 +451,16 @@ message AlterTableRequestPB {
     required string old_name = 1;
     required string new_name = 2;
   }
+  message AddRangePartition {
+    // A set of row operations containing the lower and upper range bound for
+    // the range partition to add or drop.
+    optional RowOperationsPB range_bounds = 1;
+  }
+  message DropRangePartition {
+    // A set of row operations containing the lower and upper range bound for
+    // the range partition to add or drop.
+    optional RowOperationsPB range_bounds = 1;
+  }
 
   message Step {
     optional StepType type = 1 [ default = UNKNOWN ];
@@ -457,11 +469,17 @@ message AlterTableRequestPB {
     optional AddColumn add_column = 2;
     optional DropColumn drop_column = 3;
     optional RenameColumn rename_column = 4;
+    optional AddRangePartition add_range_partition = 5;
+    optional DropRangePartition drop_range_partition = 6;
   }
 
   required TableIdentifierPB table = 1;
   repeated Step alter_schema_steps = 2;
   optional string new_table_name = 3;
+
+  // The table schema to use when decoding the range bound row operations. Only
+  // necessary when partitions are being added or dropped.
+  optional SchemaPB schema = 4;
 }
 
 message AlterTableResponsePB {
@@ -572,6 +590,8 @@ enum MasterFeatures {
   UNKNOWN_FEATURE = 0;
   // The master supports creating tables with non-covering range partitions.
   RANGE_PARTITION_BOUNDS = 1;
+  // The master supports adding and dropping range partitions.
+  ADD_DROP_RANGE_PARTITIONS = 2;
 }
 
 service MasterService {

http://git-wip-us.apache.org/repos/asf/kudu/blob/232474a5/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 23dc37b..d39547a 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -327,7 +327,11 @@ void MasterServiceImpl::GetMasterRegistration(const GetMasterRegistrationRequest
 }
 
 bool MasterServiceImpl::SupportsFeature(uint32_t feature) const {
-  return feature == MasterFeatures::RANGE_PARTITION_BOUNDS;
+  switch (feature) {
+    case MasterFeatures::RANGE_PARTITION_BOUNDS:
+    case MasterFeatures::ADD_DROP_RANGE_PARTITIONS: return true;
+    default: return false;
+  }
 }
 
 } // namespace master

http://git-wip-us.apache.org/repos/asf/kudu/blob/232474a5/src/kudu/tools/ksck-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index f542c14..279940d 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -338,7 +338,7 @@ TEST_F(KsckTest, TestOneSmallReplicatedTable) {
   ksck_->set_table_filters({"xyz"});
   ASSERT_OK(RunKsck());
   Status s = ksck_->ChecksumData(ChecksumOptions());
-  EXPECT_EQ("Not found: No tablet replicas found. Filter: table_filters=xyz", s.ToString());
+  EXPECT_EQ("Not found: No table found. Filter: table_filters=xyz", s.ToString());
   ASSERT_STR_CONTAINS(err_stream_.str(),
                       "INFO: The cluster doesn't have any matching tables");
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/232474a5/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 28e4522..7d008eb 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -382,10 +382,14 @@ Status Ksck::ChecksumData(const ChecksumOptions& opts) {
   typedef unordered_map<shared_ptr<KsckTablet>, shared_ptr<KsckTable>>
TabletTableMap;
   TabletTableMap tablet_table_map;
 
+  int num_tables = 0;
+  int num_tablets = 0;
   int num_tablet_replicas = 0;
   for (const shared_ptr<KsckTable>& table : cluster_->tables()) {
     VLOG(1) << "Table: " << table->name();
     if (!MatchesAnyPattern(table_filters_, table->name())) continue;
+    num_tables += 1;
+    num_tablets += table->tablets().size();
     for (const shared_ptr<KsckTablet>& tablet : table->tablets()) {
       VLOG(1) << "Tablet: " << tablet->id();
       if (!MatchesAnyPattern(tablet_id_filters_, tablet->id())) continue;
@@ -393,7 +397,18 @@ Status Ksck::ChecksumData(const ChecksumOptions& opts) {
       num_tablet_replicas += tablet->replicas().size();
     }
   }
-  if (num_tablet_replicas == 0) {
+
+  if (num_tables == 0) {
+    string msg = "No table found.";
+    if (!table_filters_.empty()) {
+      msg += " Filter: table_filters=" + JoinStrings(table_filters_, ",");
+    }
+    return Status::NotFound(msg);
+  }
+
+  if (num_tablets > 0 && num_tablet_replicas == 0) {
+    // Warn if the table has tablets, but no replicas. The table may have no
+    // tablets if all range partitions have been dropped.
     string msg = "No tablet replicas found.";
     if (!table_filters_.empty() || !tablet_id_filters_.empty()) {
       msg += " Filter: ";
@@ -541,10 +556,6 @@ bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table)
{
                    return MatchesAnyPattern(tablet_id_filters_, t->id());
                  });
 
-  if (tablets.empty()) {
-    Info() << Substitute("Table $0 has 0 matching tablets", table->name()) <<
endl;
-    return true;
-  }
   int table_num_replicas = table->num_replicas();
   VLOG(1) << Substitute("Verifying $0 tablets for table $1 configured with num_replicas
= $2",
                         tablets.size(), table->name(), table_num_replicas);


Mime
View raw message