From commits-return-7732-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Tue Jul 30 04:07:35 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 8AF1A18063F for ; Tue, 30 Jul 2019 06:07:34 +0200 (CEST) Received: (qmail 14524 invoked by uid 500); 30 Jul 2019 04:07:33 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 14515 invoked by uid 99); 30 Jul 2019 04:07:33 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jul 2019 04:07:33 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 9CC4B85EAB; Tue, 30 Jul 2019 04:07:33 +0000 (UTC) Date: Tue, 30 Jul 2019 04:07:33 +0000 To: "commits@kudu.apache.org" Subject: [kudu] branch master updated: KUDU-2881 Support create/drop range partition by command line MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <156445965357.15674.11227066023419452192@gitbox.apache.org> From: awong@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kudu X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: f98748c8dd33bac2595b66f281944094f0754d16 X-Git-Newrev: 58e01492320f954cd173d5ad3638d8f0530eb086 X-Git-Rev: 58e01492320f954cd173d5ad3638d8f0530eb086 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git The following commit(s) were added to refs/heads/master by this push: new 58e0149 KUDU-2881 Support create/drop range partition by command line 58e0149 is described below commit 58e01492320f954cd173d5ad3638d8f0530eb086 Author: honeyhexin AuthorDate: Sun Jul 28 15:29:07 2019 +0800 KUDU-2881 Support create/drop range partition by command line Sometimes we need to drop the range partition and then recreate it in order to rewrite in case that the partition was written wrongly. This patch supports to add/drop range partition by command line. The command can be used as: 1. kudu table add_range_partition [-lower_bound_type] [-upper_bound_type] 2. kudu table drop_range_partition [-lower_bound_type] [-upper_bound_type] The parameters of lower_bound and upper_bound can be empty, which means we will use the default value. If both these two parameters are empty, it means the range partition is unbounded. The parameters of lower_bound_type and upper_bound_type is optional. If these two parameters are not specified, the default value are INCLUSIVE_BOUND and EXCLUSIVE_BOUND respectively. These two parameters are both case-insensitive. Change-Id: I647d9c3cf01807bd8ae9f8cf4091568ea6f1161c Reviewed-on: http://gerrit.cloudera.org:8080/13833 Reviewed-by: Andrew Wong Tested-by: Kudu Jenkins --- src/kudu/common/partition.cc | 14 + src/kudu/common/partition.h | 6 + src/kudu/tools/kudu-admin-test.cc | 507 ++++++++++++++++++++++++++++++++++++ src/kudu/tools/tool_action_table.cc | 225 ++++++++++++++++ 4 files changed, 752 insertions(+) diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc index 616b75d..f406965 100644 --- a/src/kudu/common/partition.cc +++ b/src/kudu/common/partition.cc @@ -1239,4 +1239,18 @@ Status PartitionSchema::MakeUpperBoundRangePartitionKeyExclusive(KuduPartialRow* return Status::OK(); } +Status PartitionSchema::GetRangeSchemaColumnIndexes(const Schema& schema, + vector* range_column_idxs) const { + for (const ColumnId& column_id : range_schema_.column_ids) { + int32_t idx = schema.find_column_by_id(column_id); + if (idx == Schema::kColumnNotFound) { + return Status::InvalidArgument(Substitute("range partition column ID $0 " + "not found in range partition key schema.", + column_id)); + } + range_column_idxs->push_back(idx); + } + return Status::OK(); +} + } // namespace kudu diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h index 47baf93..d8e499b 100644 --- a/src/kudu/common/partition.h +++ b/src/kudu/common/partition.h @@ -257,6 +257,12 @@ class PartitionSchema { return hash_bucket_schemas_; } + // Gets the vector containing the column indexes of the range partition keys. + // If any of the columns is not in the key range columns then an + // InvalidArgument status is returned. + Status GetRangeSchemaColumnIndexes(const Schema& schema, + std::vector* range_column_idxs) const; + private: friend class PartitionPruner; FRIEND_TEST(PartitionTest, TestIncrementRangePartitionBounds); diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc index 1110ec3..b19b0aa 100644 --- a/src/kudu/tools/kudu-admin-test.cc +++ b/src/kudu/tools/kudu-admin-test.cc @@ -35,6 +35,7 @@ #include #include +#include "kudu/client/client-test-util.h" #include "kudu/client/client.h" #include "kudu/client/schema.h" #include "kudu/client/shared_ptr.h" @@ -50,6 +51,7 @@ #include "kudu/gutil/basictypes.h" #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/map-util.h" +#include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/split.h" #include "kudu/gutil/strings/strip.h" @@ -2341,5 +2343,510 @@ TEST_F(AdminCliTest, TestExtraConfig) { } } +// Insert num_rows into table from start key to (start key + num_rows). +// The other two columns of the table are specified as fixed value. +// If the insert value is out of the range partition of the table, +// the function will return IOError which as we expect. +static Status InsertTestRows(const client::sp::shared_ptr& client, + const string& table_name, + int start_key, + int num_rows) { + client::sp::shared_ptr table; + RETURN_NOT_OK(client->OpenTable(table_name, &table)); + auto session = client->NewSession(); + for (int i = start_key; i < num_rows + start_key; ++i) { + unique_ptr insert(table->NewInsert()); + auto* row = insert->mutable_row(); + RETURN_NOT_OK(row->SetInt32("key", i)); + RETURN_NOT_OK(row->SetInt32("int_val", 12345)); + RETURN_NOT_OK(row->SetString("string_val", "hello")); + Status result = session->Apply(insert.release()); + if (result.IsIOError()) { + vector errors; + ElementDeleter drop(&errors); + bool overflowed; + session->GetPendingErrors(&errors, &overflowed); + EXPECT_FALSE(overflowed); + EXPECT_EQ(1, errors.size()); + EXPECT_TRUE(errors[0]->status().IsNotFound()); + return Status::NotFound(Substitute("No range partition covers the insert value $0", i)); + } + RETURN_NOT_OK(result); + } + RETURN_NOT_OK(session->Flush()); + RETURN_NOT_OK(session->Close()); + return Status::OK(); +} + +TEST_F(AdminCliTest, TestAddAndDropUnboundedPartition) { + FLAGS_num_tablet_servers = 1; + FLAGS_num_replicas = 1; + + NO_FATALS(BuildAndStart()); + + const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); + client::sp::shared_ptr table; + + // At first, the range partition is unbounded, we can insert any data into it. + // We insert 100 rows with key range from 0 to 100, now there are 100 rows. + int num_rows = 100; + NO_FATALS(InsertTestRows(client_, kTableId, 0, num_rows)); + ASSERT_OK(client_->OpenTable(kTableId, &table)); + ASSERT_EQ(100, CountTableRows(table.get())); + + // For the unbounded range partition table, add any range partition will + // conflict with it, so we need to drop unbounded range partition first and + // then add range partition for it. Since the table is unbounded, it will + // drop all rows when dropping range partition. After dropping there will + // be 0 rows left. + string stdout, stderr; + Status s = RunKuduTool({ + "table", + "drop_range_partition", + master_addr, + kTableId, + "[]", + "[]", + }, &stdout, &stderr); + ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); + ASSERT_OK(client_->OpenTable(kTableId, &table)); + ASSERT_EQ(0, CountTableRows(table.get())); + + // Since the unbounded partition has been dropped, now we can add a new unbounded + // range parititon for the table. + s = RunKuduTool({ + "table", + "add_range_partition", + master_addr, + kTableId, + "[]", + "[]", + }, &stdout, &stderr); + ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); + + // Insert 100 rows with key range from 0 to 100, + // now there are 100 rows again. + NO_FATALS(InsertTestRows(client_, kTableId, 0, num_rows)); + ASSERT_OK(client_->OpenTable(kTableId, &table)); + ASSERT_EQ(100, CountTableRows(table.get())); +} + +TEST_F(AdminCliTest, TestAddAndDropRangePartition) { + FLAGS_num_tablet_servers = 1; + FLAGS_num_replicas = 1; + + NO_FATALS(BuildAndStart()); + + // The kTableId's range partition is unbounded, so we need to create another table to + // add or drop range partition. + const string kTestTableName = "TestTable0"; + const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); + + // Build the schema. + KuduSchema schema; + KuduSchemaBuilder builder; + builder.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull(); + builder.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull(); + builder.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->NotNull(); + builder.SetPrimaryKey({ "key" }); + ASSERT_OK(builder.Build(&schema)); + + // Set up partitioning and create the table. + unique_ptr lower_bound(schema.NewRow()); + ASSERT_OK(lower_bound->SetInt32("key", 0)); + unique_ptr upper_bound(schema.NewRow()); + ASSERT_OK(upper_bound->SetInt32("key", 100)); + unique_ptr table_creator(client_->NewTableCreator()); + ASSERT_OK(table_creator->table_name(kTestTableName) + .schema(&schema) + .set_range_partition_columns({ "key" }) + .add_range_partition(lower_bound.release(), upper_bound.release()) + .num_replicas(FLAGS_num_replicas) + .Create()); + + client::sp::shared_ptr table; + + // Lambda function to add range partition using kudu CLI. + const auto add_range_partition_using_CLI = [&] (const string& lower_bound_json, + const string& upper_bound_json, + const string& lower_bound_type, + const string& upper_bound_type) -> Status { + string error, out; + Status s = RunKuduTool({ + "table", + "add_range_partition", + master_addr, + kTestTableName, + lower_bound_json, + upper_bound_json, + Substitute("-lower_bound_type=$0", lower_bound_type), + Substitute("-upper_bound_type=$0", upper_bound_type), + }, &out, &error); + return s; + }; + + // Lambda function to drop range partition using kudu CLI. + const auto drop_range_partition_using_CLI = [&] (const string& lower_bound_json, + const string& upper_bound_json, + const string& lower_bound_type, + const string& upper_bound_type) -> Status { + string error, out; + Status s = RunKuduTool({ + "table", + "drop_range_partition", + master_addr, + kTestTableName, + lower_bound_json, + upper_bound_json, + Substitute("-lower_bound_type=$0", lower_bound_type), + Substitute("-upper_bound_type=$0", upper_bound_type), + }, &out, &error); + return s; + }; + + const auto check_bounds = [&] (const string& lower_bound, + const string& upper_bound, + const string& lower_bound_type, + const string& upper_bound_type, + int start_row_to_insert, + int num_rows_to_insert, + vector out_of_range_rows_to_insert) { + string lower_bound_type_internal = lower_bound_type.empty() ? "INCLUSIVE_BOUND" : + lower_bound_type; + + string upper_bound_type_internal = upper_bound_type.empty() ? "EXCLUSIVE_BOUND" : + upper_bound_type; + + // Add range partition. + ASSERT_OK(add_range_partition_using_CLI(lower_bound, upper_bound, + lower_bound_type_internal, + upper_bound_type_internal)); + + // Insert num_rows_to_insert rows to table. + ASSERT_OK(InsertTestRows(client_, kTestTableName, start_row_to_insert, + num_rows_to_insert)); + ASSERT_OK(client_->OpenTable(kTestTableName, &table)); + ASSERT_EQ(num_rows_to_insert, CountTableRows(table.get())); + + // Insert rows outside range partition, + // which will return error in lambda as we expect. + for (auto& value: out_of_range_rows_to_insert) { + EXPECT_TRUE(InsertTestRows(client_, kTestTableName, value, 1).IsNotFound()); + } + + // Drop range partition. + ASSERT_OK(drop_range_partition_using_CLI(lower_bound, upper_bound, + lower_bound_type_internal, + upper_bound_type_internal)); + ASSERT_OK(client_->OpenTable(kTestTableName, &table)); + + // Verify no rows are left. + ASSERT_EQ(0, CountTableRows(table.get())); + }; + + { + // Test specifying the range bound type in lower-case. + + // Drop the range partition added when create table, the range partition is + // [0,100). Insert 100 rows, data range is 0-99. Now there are 100 rows. + NO_FATALS(InsertTestRows(client_, kTestTableName, 0, 100)); + ASSERT_OK(client_->OpenTable(kTestTableName, &table)); + ASSERT_EQ(100, CountTableRows(table.get())); + + // Drop range partition of [0,100) by command line, now there are 0 rows left. + ASSERT_OK(drop_range_partition_using_CLI("[0]", "[100]", "inclusive_bound", + "exclusive_bound")); + ASSERT_OK(client_->OpenTable(kTestTableName, &table)); + ASSERT_EQ(0, CountTableRows(table.get())); + } + + { + // Test adding (INCLUSIVE_BOUND, EXCLUSIVE_BOUND) range partition. + + // Adding [100,200), 100 is inclusive and 200 is exclusive. Then insert + // 100 rows , the data range is [100-199]. Insert 99 and 200 to test + // insert out of range row. Last, dropping the range partition and checking + // that there are 0 rows left. + check_bounds("[100]", "[200]", "INCLUSIVE_BOUND", "EXCLUSIVE_BOUND", 100, 100, + { 99, 200 }); + } + + { + // Test adding (INCLUSIVE_BOUND, INCLUSIVE_BOUND) range partition. + + // Adding [100,200], both 100 and 200 are inclusive. Then insert 101 + // rows, the data range is [100,200]. Insert 99 and 201 to test insert + // out of range row. Last, dropping the range partition and checking + // that there are 0 rows left. + check_bounds("[100]", "[200]", "INCLUSIVE_BOUND", "INCLUSIVE_BOUND", 100, 101, + { 99, 201 }); + } + + + { + // Test adding (EXCLUSIVE_BOUND, INCLUSIVE_BOUND) range partition. + + // Adding (100,200], 100 is exclusive while 200 is inclusive.Then insert + // 100 rows, the data range is (100,200]. Insert 100 and 201 to test + // insert out of range row. Last, dropping the range partition and checking + // that there are 0 rows left. + check_bounds("[100]", "[200]", "EXCLUSIVE_BOUND", "INCLUSIVE_BOUND", 101, 100, + { 100, 201 }); + } + + { + // Test adding (EXCLUSIVE_BOUND, EXCLUSIVE_BOUND) range partition. + + // Adding (100,200), both 100 and 200 are exclusive.Then insert 99 rows, + // the data range is (100,200). Insert 100 and 200 to test insert out of + // range row. Last, dropping the range partition and checking that there + // are 0 rows left. + check_bounds("[100]", "[200]", "EXCLUSIVE_BOUND", "EXCLUSIVE_BOUND", 101, 99, + { 100, 200 }); + } + + { + // Test adding (INCLUSIVE_BOUND, UNBOUNDED) range partition. + + // Adding (1,unbouded), lower range bound is 1, upper range bound is unbounded, + // 1 is inclusive. Then insert 100 rows, the data range is [1-100]. Insert 0 + // to test insert out of range row. Last, dropping the range partition and + // checking that there are 0 rows left. + check_bounds("[1]", "[]", "INCLUSIVE_BOUND", "", 1, 100, + { 0 }); + } + + { + // Test adding (EXCLUSIVE_BOUND, UNBOUNDED) range partition. + + // Adding (0,unbouded), lower range bound is 0, upper range bound is unbounded, + // 0 is exclusive. Then insert 100 rows, the data range + // is [2-101]. Insert 1 to test insert out of range row. Last, dropping the range + // partition and checking that there are 0 rows left. + check_bounds("[1]", "[]", "EXCLUSIVE_BOUND", "", 2, 100, + { 1 }); + } + + { + // Test adding (UNBOUNDED, INCLUSIVE_BOUND) range partition. + + // Adding (unbouded,100), lower range bound unbound, upper range bound is 100, + // 100 is inclusive. Then insert 100 rows, the data range + // is [1-100]. Insert 101 to test insert out of range row. Last, dropping the range + // partition and checking that there are 0 rows left. + check_bounds("[]", "[100]", "", "INCLUSIVE_BOUND", 1, 100, + { 101 }); + } + + { + // Test adding (UNBOUNDED, EXCLUSIVE_BOUND) range partition. + + // Adding (unbouded,100), lower range bound unbound, upper range bound is 100, + // 100 is exclusive. Then insert 100 rows, the data range + // is [0-99]. Insert 100 to test insert out of range row. Last, dropping the range + // partition and checking that there are 0 rows left. + check_bounds("[]", "[100]", "", "EXCLUSIVE_BOUND", 0, 100, + { 100 }); + } +} + +TEST_F(AdminCliTest, TestAddAndDropRangePartitionWithWrongParameters) { + FLAGS_num_tablet_servers = 1; + FLAGS_num_replicas = 1; + + NO_FATALS(BuildAndStart()); + + const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); + const string kTestTableName = "TestTable1"; + + // Build the schema. + KuduSchema schema; + KuduSchemaBuilder builder; + builder.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull(); + builder.SetPrimaryKey({ "key" }); + ASSERT_OK(builder.Build(&schema)); + + unique_ptr lower_bound(schema.NewRow()); + ASSERT_OK(lower_bound->SetInt32("key", 0)); + unique_ptr upper_bound(schema.NewRow()); + ASSERT_OK(upper_bound->SetInt32("key", 1)); + + unique_ptr table_creator(client_->NewTableCreator()); + ASSERT_OK(table_creator->table_name(kTestTableName) + .schema(&schema) + .set_range_partition_columns({ "key" }) + .add_range_partition(lower_bound.release(), upper_bound.release()) + .num_replicas(FLAGS_num_replicas) + .Create()); + + // Lambda function to check bad input, the function will return + // OK if running tool to add range partition return RuntimeError, + // which as we expect. + const auto check_bad_input = [&](const string& lower_bound_json, + const string& upper_bound_json, + const string& lower_bound_type, + const string& upper_bound_type, + const string& error) { + string out, err; + Status s = RunKuduTool({ + "table", + "add_range_partition", + master_addr, + kTestTableName, + lower_bound_json, + upper_bound_json, + Substitute("-lower_bound_type=$0", lower_bound_type), + Substitute("-upper_bound_type=$0", upper_bound_type), + }, &out, &err); + ASSERT_TRUE(s.IsRuntimeError()); + ASSERT_STR_CONTAINS(err, error); + }; + + // Test providing wrong type of range lower bound type, it will return error. + NO_FATALS(check_bad_input("[]", "[]", "test_lower_bound_type", + "EXCLUSIVE_BOUND", + "wrong type of range lower bound")); + + // Test providing wrong type of range upper bound type, it will return error. + NO_FATALS(check_bad_input("[]", "[]", "INCLUSIVE_BOUND", + "test_upper_bound_type", + "wrong type of range upper bound")); + + // Test providing wrong number of range values, it will return error. + NO_FATALS(check_bad_input("[1,2]", "[3]", "INCLUSIVE_BOUND", + "EXCLUSIVE_BOUND", + "wrong number of range columns specified: " + "expected 1 but received 2")); + + // Test providing wrong type of range partition key: string instead of int, + // it will return error. + NO_FATALS(check_bad_input("[\"hello\"]", "[\"world\"]", "INCLUSIVE_BOUND", + "EXCLUSIVE_BOUND", + "unable to parse value")); + + // Test providing incomplete json array of range bound, it will return error. + NO_FATALS(check_bad_input("[", "[2]", "INCLUSIVE_BOUND", + "EXCLUSIVE_BOUND", + "JSON text is corrupt")); + + // Test providing wrong json array format of range bound, it will return error. + NO_FATALS(check_bad_input("[1,]", "[2]", "INCLUSIVE_BOUND", + "EXCLUSIVE_BOUND", + "JSON text is corrupt")); + + // Test providing wrong JSON that's not an array, it will return error. + NO_FATALS(check_bad_input( + "{ \"key\" : 1}", "{\"key\" : 2 }", "INCLUSIVE_BOUND", + "EXCLUSIVE_BOUND", + "wrong type during field extraction: expected object array")); +} + +TEST_F(AdminCliTest, TestAddAndDropRangePartitionForMultipleRangeColumnsTable) { + FLAGS_num_tablet_servers = 1; + FLAGS_num_replicas = 1; + + NO_FATALS(BuildAndStart()); + + const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); + const string kTestTableName = "TestTable2"; + + { + // Build the schema. + KuduSchema schema; + KuduSchemaBuilder builder; + builder.AddColumn("key_INT8")->Type(KuduColumnSchema::INT8)->NotNull(); + builder.AddColumn("key_INT16")->Type(KuduColumnSchema::INT16)->NotNull(); + builder.AddColumn("key_INT32")->Type(KuduColumnSchema::INT32)->NotNull(); + builder.AddColumn("key_INT64")->Type(KuduColumnSchema::INT64)->NotNull(); + builder.AddColumn("key_UNIXTIME_MICROS")-> + Type(KuduColumnSchema::UNIXTIME_MICROS)->NotNull(); + builder.AddColumn("key_BINARY")->Type(KuduColumnSchema::BINARY)->NotNull(); + builder.AddColumn("key_STRING")->Type(KuduColumnSchema::STRING)->NotNull(); + builder.SetPrimaryKey({ "key_INT8", "key_INT16", "key_INT32", + "key_INT64", "key_UNIXTIME_MICROS", + "key_BINARY", "key_STRING" }); + ASSERT_OK(builder.Build(&schema)); + + // Init the range partition and create table. + unique_ptr lower_bound(schema.NewRow()); + ASSERT_OK(lower_bound->SetInt8("key_INT8", 0)); + ASSERT_OK(lower_bound->SetInt16("key_INT16", 1)); + ASSERT_OK(lower_bound->SetInt32("key_INT32", 2)); + ASSERT_OK(lower_bound->SetInt64("key_INT64", 3)); + ASSERT_OK(lower_bound->SetUnixTimeMicros("key_UNIXTIME_MICROS", 4)); + ASSERT_OK(lower_bound->SetBinaryCopy("key_BINARY", "a")); + ASSERT_OK(lower_bound->SetString("key_STRING", "b")); + + unique_ptr upper_bound(schema.NewRow()); + ASSERT_OK(upper_bound->SetInt8("key_INT8", 5)); + ASSERT_OK(upper_bound->SetInt16("key_INT16", 6)); + ASSERT_OK(upper_bound->SetInt32("key_INT32", 7)); + ASSERT_OK(upper_bound->SetInt64("key_INT64", 8)); + ASSERT_OK(upper_bound->SetUnixTimeMicros("key_UNIXTIME_MICROS", 9)); + ASSERT_OK(upper_bound->SetBinaryCopy("key_BINARY", "c")); + ASSERT_OK(upper_bound->SetString("key_STRING", "d")); + + unique_ptr table_creator(client_->NewTableCreator()); + ASSERT_OK(table_creator->table_name(kTestTableName) + .schema(&schema) + .set_range_partition_columns({ "key_INT8", "key_INT16", "key_INT32", + "key_INT64", "key_UNIXTIME_MICROS", + "key_BINARY", "key_STRING" }) + .add_range_partition(lower_bound.release(), upper_bound.release()) + .num_replicas(FLAGS_num_replicas) + .Create()); + } + + // Add range partition use CLI. + string stdout, stderr; + Status s = RunKuduTool({ + "table", + "add_range_partition", + master_addr, + kTestTableName, + "[10, 11, 12, 13, 14, \"e\", \"f\"]", + "[15, 16, 17, 18, 19, \"g\", \"h\"]", + }, &stdout, &stderr); + ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); + + client::sp::shared_ptr table; + ASSERT_OK(client_->OpenTable(kTestTableName, &table)); + { + // Insert test row. + auto session = client_->NewSession(); + unique_ptr insert(table->NewInsert()); + auto* row = insert->mutable_row(); + ASSERT_OK(row->SetInt8("key_INT8", 10)); + ASSERT_OK(row->SetInt16("key_INT16", 11)); + ASSERT_OK(row->SetInt32("key_INT32", 12)); + ASSERT_OK(row->SetInt64("key_INT64", 13)); + ASSERT_OK(row->SetUnixTimeMicros("key_UNIXTIME_MICROS", 14)); + ASSERT_OK(row->SetBinaryCopy("key_BINARY", "e")); + ASSERT_OK(row->SetString("key_STRING", "f")); + ASSERT_OK(session->Apply(insert.release())); + ASSERT_OK(session->Flush()); + ASSERT_OK(session->Close()); + } + + // There is 1 row in table now. + ASSERT_OK(client_->OpenTable(kTestTableName, &table)); + ASSERT_EQ(1, CountTableRows(table.get())); + + // Drop range partition use CLI. + s = RunKuduTool({ + "table", + "drop_range_partition", + master_addr, + kTestTableName, + "[10, 11, 12, 13, 14, \"e\", \"f\"]", + "[15, 16, 17, 18, 19, \"g\", \"h\"]", + }, &stdout, &stderr); + ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); + + // There are 0 rows left. + ASSERT_OK(client_->OpenTable(kTestTableName, &table)); + ASSERT_EQ(0, CountTableRows(table.get())); +} + } // namespace tools } // namespace kudu diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc index 6ca7373..0abed54 100644 --- a/src/kudu/tools/tool_action_table.cc +++ b/src/kudu/tools/tool_action_table.cc @@ -26,9 +26,11 @@ #include #include +#include #include #include #include +#include #include #include "kudu/client/client.h" @@ -38,6 +40,7 @@ #include "kudu/client/schema.h" #include "kudu/client/shared_ptr.h" #include "kudu/client/value.h" +#include "kudu/common/partial_row.h" #include "kudu/common/partition.h" #include "kudu/common/schema.h" #include "kudu/gutil/map-util.h" @@ -61,6 +64,7 @@ using kudu::client::KuduScanner; using kudu::client::KuduSchema; using kudu::client::KuduTable; using kudu::client::KuduTableAlterer; +using kudu::client::KuduTableCreator; using kudu::client::internal::ReplicaController; using std::cerr; using std::cout; @@ -88,6 +92,12 @@ DEFINE_bool(modify_external_catalogs, true, DEFINE_string(config_names, "", "Comma-separated list of configurations to display. " "An empty value displays all configs."); +DEFINE_string(lower_bound_type, "INCLUSIVE_BOUND", + "The type of the lower bound, either inclusive or exclusive. " + "Defaults to inclusive. This flag is case-insensitive."); +DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND", + "The type of the upper bound, either inclusive or exclusive. " + "Defaults to exclusive. This flag is case-insensitive."); DECLARE_bool(show_values); DECLARE_string(tables); @@ -147,6 +157,14 @@ const char* const kNewColumnNameArg = "new_column_name"; const char* const kKeyArg = "primary_key"; const char* const kConfigNameArg = "config_name"; const char* const kConfigValueArg = "config_value"; +const char* const kErrorMsgArg = "unable to parse value $0 for column $1 of type $2"; +const char* const kTableRangeLowerBoundArg = "table_range_lower_bound"; +const char* const kTableRangeUpperBoundArg = "table_range_upper_bound"; + +enum PartitionAction { + ADD, + DROP, +}; Status DeleteTable(const RunnerContext& context) { const string& table_name = FindOrDie(context.required_args, kTableNameArg); @@ -464,6 +482,178 @@ Status GetExtraConfigs(const RunnerContext& context) { return data_table.PrintTo(cout); } +Status ConvertToKuduPartialRow(const client::sp::shared_ptr& table, + const string& range_bound_str, + KuduPartialRow* range_bound_partial_row) { + JsonReader reader(range_bound_str); + RETURN_NOT_OK(reader.Init()); + vector values; + RETURN_NOT_OK(reader.ExtractObjectArray(reader.root(), + /*field=*/nullptr, + &values)); + + // If range_bound_str is empty, an empty row will be used. + if (values.empty()) { + return Status::OK(); + } + const Schema& schema = KuduSchema::ToSchema(table->schema()); + const auto& partition_schema = table->partition_schema(); + vector key_indexes; + RETURN_NOT_OK(partition_schema.GetRangeSchemaColumnIndexes(schema, &key_indexes)); + if (values.size() != key_indexes.size()) { + return Status::InvalidArgument( + Substitute("wrong number of range columns specified: expected $0 but received $1", + key_indexes.size(), + values.size())); + } + + for (int i = 0; i < values.size(); i++) { + const auto key_index = key_indexes[i]; + const auto& column = table->schema().Column(key_index); + const auto& col_name = column.name(); + const auto type = column.type(); + const auto error_msg = Substitute(kErrorMsgArg, values[i], col_name, + KuduColumnSchema::DataTypeToString(type)); + switch (type) { + case KuduColumnSchema::INT8: { + int64_t value; + RETURN_NOT_OK_PREPEND( + reader.ExtractInt64(values[i], /*field=*/nullptr, &value), + error_msg); + range_bound_partial_row->SetInt8(col_name, static_cast(value)); + break; + } + case KuduColumnSchema::INT16: { + int64_t value; + RETURN_NOT_OK_PREPEND( + reader.ExtractInt64(values[i], /*field=*/nullptr, &value), + error_msg); + range_bound_partial_row->SetInt16(col_name, static_cast(value)); + break; + } + case KuduColumnSchema::INT32: { + int32_t value; + RETURN_NOT_OK_PREPEND( + reader.ExtractInt32(values[i], /*field=*/nullptr, &value), + error_msg); + range_bound_partial_row->SetInt32(col_name, value); + break; + } + case KuduColumnSchema::INT64: { + int64_t value; + RETURN_NOT_OK_PREPEND( + reader.ExtractInt64(values[i], /*field=*/nullptr, &value), + error_msg); + range_bound_partial_row->SetInt64(col_name, value); + break; + } + case KuduColumnSchema::UNIXTIME_MICROS: { + int64_t value; + RETURN_NOT_OK_PREPEND( + reader.ExtractInt64(values[i], /*field=*/nullptr, &value), + error_msg); + range_bound_partial_row->SetUnixTimeMicros(col_name, value); + break; + } + case KuduColumnSchema::BINARY: { + string value; + RETURN_NOT_OK_PREPEND( + reader.ExtractString(values[i], /*field=*/nullptr, &value), + error_msg); + range_bound_partial_row->SetBinary(col_name, value); + break; + } + case KuduColumnSchema::STRING: { + string value; + RETURN_NOT_OK_PREPEND( + reader.ExtractString(values[i], /*field=*/nullptr, &value), + error_msg); + range_bound_partial_row->SetString(col_name, value); + break; + } + case KuduColumnSchema::BOOL: + case KuduColumnSchema::FLOAT: + case KuduColumnSchema::DOUBLE: + case KuduColumnSchema::DECIMAL: + default: + return Status::NotSupported( + Substitute("unsupported type $0 for key column '$1': " + "is this tool out of date?", + KuduColumnSchema::DataTypeToString(type), + col_name)); + } + } + return Status::OK(); +} + +Status ModifyRangePartition(const RunnerContext& context, PartitionAction action) { + const string& table_name = FindOrDie(context.required_args, kTableNameArg); + const string& table_range_lower_bound = FindOrDie(context.required_args, + kTableRangeLowerBoundArg); + const string& table_range_upper_bound = FindOrDie(context.required_args, + kTableRangeUpperBoundArg); + + const auto convert_bounds_type = [&] (const string& range_bound, + const string& flags_range_bound_type, + KuduTableCreator::RangePartitionBound* range_bound_type) + -> Status { + string inclusive_bound = boost::iequals(flags_range_bound_type, "INCLUSIVE_BOUND") ? + "INCLUSIVE_BOUND" : ""; + string exclusive_bound = boost::iequals(flags_range_bound_type, "EXCLUSIVE_BOUND") ? + "EXCLUSIVE_BOUND" : ""; + + if (inclusive_bound.empty() && exclusive_bound.empty()) { + return Status::InvalidArgument(Substitute( + "wrong type of range $0 : only 'INCLUSIVE_BOUND' or " + "'EXCLUSIVE_BOUND' can be received", range_bound)); + + } + *range_bound_type = exclusive_bound.empty() ? KuduTableCreator::INCLUSIVE_BOUND : + KuduTableCreator::EXCLUSIVE_BOUND; + + return Status::OK(); + }; + + client::sp::shared_ptr client; + client::sp::shared_ptr table; + RETURN_NOT_OK(CreateKuduClient(context, &client)); + RETURN_NOT_OK(client->OpenTable(table_name, &table)); + const auto& schema = table->schema(); + + unique_ptr lower_bound(schema.NewRow()); + unique_ptr upper_bound(schema.NewRow()); + + RETURN_NOT_OK(ConvertToKuduPartialRow(table, table_range_lower_bound, lower_bound.get())); + RETURN_NOT_OK(ConvertToKuduPartialRow(table, table_range_upper_bound, upper_bound.get())); + + KuduTableCreator::RangePartitionBound lower_bound_type; + KuduTableCreator::RangePartitionBound upper_bound_type; + + RETURN_NOT_OK(convert_bounds_type("lower bound", FLAGS_lower_bound_type, &lower_bound_type)); + RETURN_NOT_OK(convert_bounds_type("upper bound", FLAGS_upper_bound_type, &upper_bound_type)); + + unique_ptr alterer(client->NewTableAlterer(table_name)); + if (action == PartitionAction::ADD) { + return alterer->AddRangePartition(lower_bound.release(), + upper_bound.release(), + lower_bound_type, + upper_bound_type)->Alter(); + } + DCHECK_EQ(PartitionAction::DROP, action); + return alterer->DropRangePartition(lower_bound.release(), + upper_bound.release(), + lower_bound_type, + upper_bound_type)->Alter(); +} + +Status DropRangePartition(const RunnerContext& context) { + return ModifyRangePartition(context, PartitionAction::DROP); +} + +Status AddRangePartition(const RunnerContext& context) { + return ModifyRangePartition(context, PartitionAction::ADD); +} + } // anonymous namespace unique_ptr BuildTableMode() { @@ -576,11 +766,46 @@ unique_ptr BuildTableMode() { "Name of the table for which to get extra configurations" }) .AddOptionalParameter("config_names") .Build(); + unique_ptr drop_range_partition = + ActionBuilder("drop_range_partition", &DropRangePartition) + .Description("Drop a range partition of table") + .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc }) + .AddRequiredParameter({ kTableNameArg, "Name of the table" }) + .AddRequiredParameter({ kTableRangeLowerBoundArg, + "String representation of lower bound of " + "the table range partition as a JSON array" }) + .AddRequiredParameter({ kTableRangeUpperBoundArg, + "String representation of upper bound of " + "the table range partition as a JSON array" }) + .AddOptionalParameter("lower_bound_type") + .AddOptionalParameter("upper_bound_type") + .Build(); + + unique_ptr add_range_partition = + ActionBuilder("add_range_partition", &AddRangePartition) + .Description("Add a range partition for table") + .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc }) + .AddRequiredParameter({ kTableNameArg, "Name of the table" }) + .AddRequiredParameter({ kTableRangeLowerBoundArg, + "String representation of lower bound of " + "the table range partition as a JSON array." + "If the parameter is an empty array, " + "the lower range partition will be unbounded" }) + .AddRequiredParameter({ kTableRangeUpperBoundArg, + "String representation of upper bound of " + "the table range partition as a JSON array." + "If the parameter is an empty array, " + "the upper range partition will be unbounded" }) + .AddOptionalParameter("lower_bound_type") + .AddOptionalParameter("upper_bound_type") + .Build(); return ModeBuilder("table") .Description("Operate on Kudu tables") + .AddAction(std::move(add_range_partition)) .AddAction(std::move(delete_table)) .AddAction(std::move(describe_table)) + .AddAction(std::move(drop_range_partition)) .AddAction(std::move(list_tables)) .AddAction(std::move(locate_row)) .AddAction(std::move(rename_column))