kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [kudu] branch master updated: KUDU-2881 Support create/drop range partition by command line
Date Tue, 30 Jul 2019 04:07:33 GMT
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 58e0149  KUDU-2881 Support create/drop range partition by command line
58e0149 is described below

commit 58e01492320f954cd173d5ad3638d8f0530eb086
Author: honeyhexin <honeyhexin@sohu.com>
AuthorDate: Sun Jul 28 15:29:07 2019 +0800

    KUDU-2881 Support create/drop range partition by command line
    
    Sometimes we need to drop the range partition and then recreate it in
    order to rewrite in case that the partition was written wrongly. This
    patch supports to add/drop range partition by command line. The command
    can be used as:
    1. kudu table add_range_partition <master_addresses> <table_name>
    <lower_bound> <upper_bound> [-lower_bound_type] [-upper_bound_type]
    2. kudu table drop_range_partition <master_addresses> <table_name>
    <lower_bound> <upper_bound> [-lower_bound_type] [-upper_bound_type]
    
    The parameters of lower_bound and upper_bound can be empty, which means
    we will use the default value. If both these two parameters are empty,
    it means the range partition is unbounded.
    
    The parameters of lower_bound_type and upper_bound_type is optional. If
    these two parameters are not specified, the default value are
    INCLUSIVE_BOUND and EXCLUSIVE_BOUND respectively. These two parameters
    are both case-insensitive.
    
    Change-Id: I647d9c3cf01807bd8ae9f8cf4091568ea6f1161c
    Reviewed-on: http://gerrit.cloudera.org:8080/13833
    Reviewed-by: Andrew Wong <awong@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/partition.cc        |  14 +
 src/kudu/common/partition.h         |   6 +
 src/kudu/tools/kudu-admin-test.cc   | 507 ++++++++++++++++++++++++++++++++++++
 src/kudu/tools/tool_action_table.cc | 225 ++++++++++++++++
 4 files changed, 752 insertions(+)

diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index 616b75d..f406965 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -1239,4 +1239,18 @@ Status PartitionSchema::MakeUpperBoundRangePartitionKeyExclusive(KuduPartialRow*
   return Status::OK();
 }
 
+Status PartitionSchema::GetRangeSchemaColumnIndexes(const Schema& schema,
+                                                    vector<int32_t>* range_column_idxs)
const {
+  for (const ColumnId& column_id : range_schema_.column_ids) {
+    int32_t idx = schema.find_column_by_id(column_id);
+    if (idx == Schema::kColumnNotFound) {
+      return Status::InvalidArgument(Substitute("range partition column ID $0 "
+                                                "not found in range partition key schema.",
+                                                column_id));
+    }
+    range_column_idxs->push_back(idx);
+  }
+  return Status::OK();
+}
+
 } // namespace kudu
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index 47baf93..d8e499b 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -257,6 +257,12 @@ class PartitionSchema {
     return hash_bucket_schemas_;
   }
 
+  // Gets the vector containing the column indexes of the range partition keys.
+  // If any of the columns is not in the key range columns then an
+  // InvalidArgument status is returned.
+  Status GetRangeSchemaColumnIndexes(const Schema& schema,
+                                     std::vector<int>* range_column_idxs) const;
+
  private:
   friend class PartitionPruner;
   FRIEND_TEST(PartitionTest, TestIncrementRangePartitionBounds);
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index 1110ec3..b19b0aa 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -35,6 +35,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/client/client-test-util.h"
 #include "kudu/client/client.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h"
@@ -50,6 +51,7 @@
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strip.h"
@@ -2341,5 +2343,510 @@ TEST_F(AdminCliTest, TestExtraConfig) {
   }
 }
 
+// Insert num_rows into table from start key to (start key + num_rows).
+// The other two columns of the table are specified as fixed value.
+// If the insert value is out of the range partition of the table,
+// the function will return IOError which as we expect.
+static Status InsertTestRows(const client::sp::shared_ptr<KuduClient>& client,
+                             const string& table_name,
+                             int start_key,
+                             int num_rows) {
+  client::sp::shared_ptr<KuduTable> table;
+  RETURN_NOT_OK(client->OpenTable(table_name, &table));
+  auto session = client->NewSession();
+  for (int i = start_key; i < num_rows + start_key; ++i) {
+    unique_ptr<KuduInsert> insert(table->NewInsert());
+    auto* row = insert->mutable_row();
+    RETURN_NOT_OK(row->SetInt32("key", i));
+    RETURN_NOT_OK(row->SetInt32("int_val", 12345));
+    RETURN_NOT_OK(row->SetString("string_val", "hello"));
+    Status result = session->Apply(insert.release());
+    if (result.IsIOError()) {
+      vector<kudu::client::KuduError*> errors;
+      ElementDeleter drop(&errors);
+      bool overflowed;
+      session->GetPendingErrors(&errors, &overflowed);
+      EXPECT_FALSE(overflowed);
+      EXPECT_EQ(1, errors.size());
+      EXPECT_TRUE(errors[0]->status().IsNotFound());
+      return Status::NotFound(Substitute("No range partition covers the insert value $0",
i));
+    }
+    RETURN_NOT_OK(result);
+  }
+  RETURN_NOT_OK(session->Flush());
+  RETURN_NOT_OK(session->Close());
+  return Status::OK();
+}
+
+TEST_F(AdminCliTest, TestAddAndDropUnboundedPartition) {
+  FLAGS_num_tablet_servers = 1;
+  FLAGS_num_replicas = 1;
+
+  NO_FATALS(BuildAndStart());
+
+  const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
+  client::sp::shared_ptr<KuduTable> table;
+
+  // At first, the range partition is unbounded, we can insert any data into it.
+  // We insert 100 rows with key range from 0 to 100, now there are 100 rows.
+  int num_rows = 100;
+  NO_FATALS(InsertTestRows(client_, kTableId, 0, num_rows));
+  ASSERT_OK(client_->OpenTable(kTableId, &table));
+  ASSERT_EQ(100, CountTableRows(table.get()));
+
+  // For the unbounded range partition table, add any range partition will
+  // conflict with it, so we need to drop unbounded range partition first and
+  // then add range partition for it. Since the table is unbounded, it will
+  // drop all rows when dropping range partition. After dropping there will
+  // be 0 rows left.
+  string stdout, stderr;
+  Status s = RunKuduTool({
+    "table",
+    "drop_range_partition",
+    master_addr,
+    kTableId,
+    "[]",
+    "[]",
+  }, &stdout, &stderr);
+  ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+  ASSERT_OK(client_->OpenTable(kTableId, &table));
+  ASSERT_EQ(0, CountTableRows(table.get()));
+
+  // Since the unbounded partition has been dropped, now we can add a new unbounded
+  // range parititon for the table.
+  s = RunKuduTool({
+    "table",
+    "add_range_partition",
+    master_addr,
+    kTableId,
+    "[]",
+    "[]",
+  }, &stdout, &stderr);
+  ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+
+  // Insert 100 rows with key range from 0 to 100,
+  // now there are 100 rows again.
+  NO_FATALS(InsertTestRows(client_, kTableId, 0, num_rows));
+  ASSERT_OK(client_->OpenTable(kTableId, &table));
+  ASSERT_EQ(100, CountTableRows(table.get()));
+}
+
+TEST_F(AdminCliTest, TestAddAndDropRangePartition) {
+  FLAGS_num_tablet_servers = 1;
+  FLAGS_num_replicas = 1;
+
+  NO_FATALS(BuildAndStart());
+
+  // The kTableId's range partition is unbounded, so we need to create another table to
+  // add or drop range partition.
+  const string kTestTableName = "TestTable0";
+  const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
+
+  // Build the schema.
+  KuduSchema schema;
+  KuduSchemaBuilder builder;
+  builder.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull();
+  builder.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
+  builder.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->NotNull();
+  builder.SetPrimaryKey({ "key" });
+  ASSERT_OK(builder.Build(&schema));
+
+  // Set up partitioning and create the table.
+  unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+  ASSERT_OK(lower_bound->SetInt32("key", 0));
+  unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+  ASSERT_OK(upper_bound->SetInt32("key", 100));
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(kTestTableName)
+      .schema(&schema)
+      .set_range_partition_columns({ "key" })
+      .add_range_partition(lower_bound.release(), upper_bound.release())
+      .num_replicas(FLAGS_num_replicas)
+      .Create());
+
+  client::sp::shared_ptr<KuduTable> table;
+
+  // Lambda function to add range partition using kudu CLI.
+  const auto add_range_partition_using_CLI = [&] (const string& lower_bound_json,
+                                                  const string& upper_bound_json,
+                                                  const string& lower_bound_type,
+                                                  const string& upper_bound_type) ->
Status {
+    string error, out;
+    Status s = RunKuduTool({
+      "table",
+      "add_range_partition",
+      master_addr,
+      kTestTableName,
+      lower_bound_json,
+      upper_bound_json,
+      Substitute("-lower_bound_type=$0", lower_bound_type),
+      Substitute("-upper_bound_type=$0", upper_bound_type),
+    }, &out, &error);
+    return s;
+  };
+
+  // Lambda function to drop range partition using kudu CLI.
+  const auto drop_range_partition_using_CLI = [&] (const string& lower_bound_json,
+                                                   const string& upper_bound_json,
+                                                   const string& lower_bound_type,
+                                                   const string& upper_bound_type) ->
Status {
+    string error, out;
+    Status s = RunKuduTool({
+      "table",
+      "drop_range_partition",
+      master_addr,
+      kTestTableName,
+      lower_bound_json,
+      upper_bound_json,
+      Substitute("-lower_bound_type=$0", lower_bound_type),
+      Substitute("-upper_bound_type=$0", upper_bound_type),
+    }, &out, &error);
+    return s;
+  };
+
+  const auto check_bounds = [&] (const string& lower_bound,
+                                 const string& upper_bound,
+                                 const string& lower_bound_type,
+                                 const string& upper_bound_type,
+                                 int start_row_to_insert,
+                                 int num_rows_to_insert,
+                                 vector<int> out_of_range_rows_to_insert) {
+    string lower_bound_type_internal = lower_bound_type.empty() ? "INCLUSIVE_BOUND" :
+        lower_bound_type;
+
+    string upper_bound_type_internal = upper_bound_type.empty() ? "EXCLUSIVE_BOUND" :
+        upper_bound_type;
+
+    // Add range partition.
+    ASSERT_OK(add_range_partition_using_CLI(lower_bound, upper_bound,
+                                            lower_bound_type_internal,
+                                            upper_bound_type_internal));
+
+    // Insert num_rows_to_insert rows to table.
+    ASSERT_OK(InsertTestRows(client_, kTestTableName, start_row_to_insert,
+                             num_rows_to_insert));
+    ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+    ASSERT_EQ(num_rows_to_insert, CountTableRows(table.get()));
+
+    // Insert rows outside range partition,
+    // which will return error in lambda as we expect.
+    for (auto& value: out_of_range_rows_to_insert) {
+      EXPECT_TRUE(InsertTestRows(client_, kTestTableName, value, 1).IsNotFound());
+    }
+
+    // Drop range partition.
+    ASSERT_OK(drop_range_partition_using_CLI(lower_bound, upper_bound,
+                                             lower_bound_type_internal,
+                                             upper_bound_type_internal));
+    ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+
+    // Verify no rows are left.
+    ASSERT_EQ(0, CountTableRows(table.get()));
+  };
+
+  {
+    // Test specifying the range bound type in lower-case.
+
+    // Drop the range partition added when create table, the range partition is
+    // [0,100). Insert 100 rows, data range is 0-99. Now there are 100 rows.
+    NO_FATALS(InsertTestRows(client_, kTestTableName, 0, 100));
+    ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+    ASSERT_EQ(100, CountTableRows(table.get()));
+
+    // Drop range partition of [0,100) by command line, now there are 0 rows left.
+    ASSERT_OK(drop_range_partition_using_CLI("[0]", "[100]", "inclusive_bound",
+                                             "exclusive_bound"));
+    ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+    ASSERT_EQ(0, CountTableRows(table.get()));
+  }
+
+  {
+    // Test adding (INCLUSIVE_BOUND, EXCLUSIVE_BOUND) range partition.
+
+    // Adding [100,200), 100 is inclusive and 200 is exclusive. Then insert
+    // 100 rows , the data range is [100-199]. Insert 99 and 200 to test
+    // insert out of range row. Last, dropping the range partition and checking
+    // that there are 0 rows left.
+    check_bounds("[100]", "[200]", "INCLUSIVE_BOUND", "EXCLUSIVE_BOUND", 100, 100,
+        { 99, 200 });
+  }
+
+  {
+    // Test adding (INCLUSIVE_BOUND, INCLUSIVE_BOUND) range partition.
+
+    // Adding [100,200], both 100 and 200 are inclusive. Then insert 101
+    // rows, the data range is [100,200]. Insert 99 and 201 to test insert
+    // out of range row. Last, dropping the range partition and checking
+    // that there are 0 rows left.
+    check_bounds("[100]", "[200]", "INCLUSIVE_BOUND", "INCLUSIVE_BOUND", 100, 101,
+        { 99, 201 });
+  }
+
+
+  {
+    // Test adding (EXCLUSIVE_BOUND, INCLUSIVE_BOUND) range partition.
+
+    // Adding (100,200], 100 is exclusive while 200 is inclusive.Then insert
+    // 100 rows, the data range is (100,200]. Insert 100 and 201 to test
+    // insert out of range row. Last, dropping the range partition and checking
+    // that there are 0 rows left.
+    check_bounds("[100]", "[200]", "EXCLUSIVE_BOUND", "INCLUSIVE_BOUND", 101, 100,
+        { 100, 201 });
+  }
+
+  {
+    // Test adding (EXCLUSIVE_BOUND, EXCLUSIVE_BOUND) range partition.
+
+    // Adding (100,200), both 100 and 200 are exclusive.Then insert 99 rows,
+    // the data range is (100,200). Insert 100 and 200 to test insert out of
+    // range row. Last, dropping the range partition and checking that there
+    // are 0 rows left.
+    check_bounds("[100]", "[200]", "EXCLUSIVE_BOUND", "EXCLUSIVE_BOUND", 101, 99,
+        { 100, 200 });
+  }
+
+  {
+    // Test adding (INCLUSIVE_BOUND, UNBOUNDED) range partition.
+
+    // Adding (1,unbouded), lower range bound is 1, upper range bound is unbounded,
+    // 1 is inclusive. Then insert 100 rows, the data range is [1-100]. Insert 0
+    // to test insert out of range row. Last, dropping the range partition and
+    // checking that there are 0 rows left.
+    check_bounds("[1]", "[]", "INCLUSIVE_BOUND", "", 1, 100,
+        { 0 });
+  }
+
+  {
+    // Test adding (EXCLUSIVE_BOUND, UNBOUNDED) range partition.
+
+    // Adding (0,unbouded), lower range bound is 0, upper range bound is unbounded,
+    // 0 is exclusive. Then insert 100 rows, the data range
+    // is [2-101]. Insert 1 to test insert out of range row. Last, dropping the range
+    // partition and checking that there are 0 rows left.
+    check_bounds("[1]", "[]", "EXCLUSIVE_BOUND", "", 2, 100,
+        { 1 });
+  }
+
+  {
+    // Test adding (UNBOUNDED, INCLUSIVE_BOUND) range partition.
+
+    // Adding (unbouded,100), lower range bound unbound, upper range bound is 100,
+    // 100 is inclusive. Then insert 100 rows, the data range
+    // is [1-100]. Insert 101 to test insert out of range row. Last, dropping the range
+    // partition and checking that there are 0 rows left.
+    check_bounds("[]", "[100]", "", "INCLUSIVE_BOUND", 1, 100,
+        { 101 });
+  }
+
+  {
+    // Test adding (UNBOUNDED, EXCLUSIVE_BOUND) range partition.
+
+    // Adding (unbouded,100), lower range bound unbound, upper range bound is 100,
+    // 100 is exclusive. Then insert 100 rows, the data range
+    // is [0-99]. Insert 100 to test insert out of range row. Last, dropping the range
+    // partition and checking that there are 0 rows left.
+    check_bounds("[]", "[100]", "", "EXCLUSIVE_BOUND", 0, 100,
+        { 100 });
+  }
+}
+
+TEST_F(AdminCliTest, TestAddAndDropRangePartitionWithWrongParameters) {
+  FLAGS_num_tablet_servers = 1;
+  FLAGS_num_replicas = 1;
+
+  NO_FATALS(BuildAndStart());
+
+  const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
+  const string kTestTableName = "TestTable1";
+
+  // Build the schema.
+  KuduSchema schema;
+  KuduSchemaBuilder builder;
+  builder.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull();
+  builder.SetPrimaryKey({ "key" });
+  ASSERT_OK(builder.Build(&schema));
+
+  unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+  ASSERT_OK(lower_bound->SetInt32("key", 0));
+  unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+  ASSERT_OK(upper_bound->SetInt32("key", 1));
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(kTestTableName)
+      .schema(&schema)
+      .set_range_partition_columns({ "key" })
+      .add_range_partition(lower_bound.release(), upper_bound.release())
+      .num_replicas(FLAGS_num_replicas)
+      .Create());
+
+  // Lambda function to check bad input, the function will return
+  // OK if running tool to add range partition return RuntimeError,
+  // which as we expect.
+  const auto check_bad_input = [&](const string& lower_bound_json,
+                                   const string& upper_bound_json,
+                                   const string& lower_bound_type,
+                                   const string& upper_bound_type,
+                                   const string& error) {
+    string out, err;
+    Status s = RunKuduTool({
+      "table",
+      "add_range_partition",
+      master_addr,
+      kTestTableName,
+      lower_bound_json,
+      upper_bound_json,
+      Substitute("-lower_bound_type=$0", lower_bound_type),
+      Substitute("-upper_bound_type=$0", upper_bound_type),
+    }, &out, &err);
+    ASSERT_TRUE(s.IsRuntimeError());
+    ASSERT_STR_CONTAINS(err, error);
+  };
+
+  // Test providing wrong type of range lower bound type, it will return error.
+  NO_FATALS(check_bad_input("[]", "[]", "test_lower_bound_type",
+                            "EXCLUSIVE_BOUND",
+                            "wrong type of range lower bound"));
+
+  // Test providing wrong type of range upper bound type, it will return error.
+  NO_FATALS(check_bad_input("[]", "[]", "INCLUSIVE_BOUND",
+                            "test_upper_bound_type",
+                            "wrong type of range upper bound"));
+
+  // Test providing wrong number of range values, it will return error.
+  NO_FATALS(check_bad_input("[1,2]", "[3]", "INCLUSIVE_BOUND",
+                            "EXCLUSIVE_BOUND",
+                            "wrong number of range columns specified: "
+                            "expected 1 but received 2"));
+
+  // Test providing wrong type of range partition key: string instead of int,
+  // it will return error.
+  NO_FATALS(check_bad_input("[\"hello\"]", "[\"world\"]", "INCLUSIVE_BOUND",
+                            "EXCLUSIVE_BOUND",
+                            "unable to parse value"));
+
+  // Test providing incomplete json array of range bound, it will return error.
+  NO_FATALS(check_bad_input("[", "[2]", "INCLUSIVE_BOUND",
+                            "EXCLUSIVE_BOUND",
+                            "JSON text is corrupt"));
+
+  // Test providing wrong json array format of range bound, it will return error.
+  NO_FATALS(check_bad_input("[1,]", "[2]", "INCLUSIVE_BOUND",
+                            "EXCLUSIVE_BOUND",
+                            "JSON text is corrupt"));
+
+  // Test providing wrong JSON that's not an array, it will return error.
+  NO_FATALS(check_bad_input(
+      "{ \"key\" : 1}", "{\"key\" : 2 }", "INCLUSIVE_BOUND",
+      "EXCLUSIVE_BOUND",
+      "wrong type during field extraction: expected object array"));
+}
+
+TEST_F(AdminCliTest, TestAddAndDropRangePartitionForMultipleRangeColumnsTable) {
+  FLAGS_num_tablet_servers = 1;
+  FLAGS_num_replicas = 1;
+
+  NO_FATALS(BuildAndStart());
+
+  const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
+  const string kTestTableName = "TestTable2";
+
+  {
+    // Build the schema.
+    KuduSchema schema;
+    KuduSchemaBuilder builder;
+    builder.AddColumn("key_INT8")->Type(KuduColumnSchema::INT8)->NotNull();
+    builder.AddColumn("key_INT16")->Type(KuduColumnSchema::INT16)->NotNull();
+    builder.AddColumn("key_INT32")->Type(KuduColumnSchema::INT32)->NotNull();
+    builder.AddColumn("key_INT64")->Type(KuduColumnSchema::INT64)->NotNull();
+    builder.AddColumn("key_UNIXTIME_MICROS")->
+      Type(KuduColumnSchema::UNIXTIME_MICROS)->NotNull();
+    builder.AddColumn("key_BINARY")->Type(KuduColumnSchema::BINARY)->NotNull();
+    builder.AddColumn("key_STRING")->Type(KuduColumnSchema::STRING)->NotNull();
+    builder.SetPrimaryKey({ "key_INT8", "key_INT16", "key_INT32",
+                            "key_INT64", "key_UNIXTIME_MICROS",
+                            "key_BINARY", "key_STRING" });
+    ASSERT_OK(builder.Build(&schema));
+
+    // Init the range partition and create table.
+    unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+    ASSERT_OK(lower_bound->SetInt8("key_INT8", 0));
+    ASSERT_OK(lower_bound->SetInt16("key_INT16", 1));
+    ASSERT_OK(lower_bound->SetInt32("key_INT32", 2));
+    ASSERT_OK(lower_bound->SetInt64("key_INT64", 3));
+    ASSERT_OK(lower_bound->SetUnixTimeMicros("key_UNIXTIME_MICROS", 4));
+    ASSERT_OK(lower_bound->SetBinaryCopy("key_BINARY", "a"));
+    ASSERT_OK(lower_bound->SetString("key_STRING", "b"));
+
+    unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+    ASSERT_OK(upper_bound->SetInt8("key_INT8", 5));
+    ASSERT_OK(upper_bound->SetInt16("key_INT16", 6));
+    ASSERT_OK(upper_bound->SetInt32("key_INT32", 7));
+    ASSERT_OK(upper_bound->SetInt64("key_INT64", 8));
+    ASSERT_OK(upper_bound->SetUnixTimeMicros("key_UNIXTIME_MICROS", 9));
+    ASSERT_OK(upper_bound->SetBinaryCopy("key_BINARY", "c"));
+    ASSERT_OK(upper_bound->SetString("key_STRING", "d"));
+
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    ASSERT_OK(table_creator->table_name(kTestTableName)
+        .schema(&schema)
+        .set_range_partition_columns({ "key_INT8", "key_INT16", "key_INT32",
+                                       "key_INT64", "key_UNIXTIME_MICROS",
+                                       "key_BINARY", "key_STRING" })
+        .add_range_partition(lower_bound.release(), upper_bound.release())
+        .num_replicas(FLAGS_num_replicas)
+        .Create());
+  }
+
+  // Add range partition use CLI.
+  string stdout, stderr;
+  Status s = RunKuduTool({
+    "table",
+    "add_range_partition",
+    master_addr,
+    kTestTableName,
+    "[10, 11, 12, 13, 14, \"e\", \"f\"]",
+    "[15, 16, 17, 18, 19, \"g\", \"h\"]",
+  }, &stdout, &stderr);
+  ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+
+  client::sp::shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+  {
+    // Insert test row.
+    auto session = client_->NewSession();
+    unique_ptr<KuduInsert> insert(table->NewInsert());
+    auto* row = insert->mutable_row();
+    ASSERT_OK(row->SetInt8("key_INT8", 10));
+    ASSERT_OK(row->SetInt16("key_INT16", 11));
+    ASSERT_OK(row->SetInt32("key_INT32", 12));
+    ASSERT_OK(row->SetInt64("key_INT64", 13));
+    ASSERT_OK(row->SetUnixTimeMicros("key_UNIXTIME_MICROS", 14));
+    ASSERT_OK(row->SetBinaryCopy("key_BINARY", "e"));
+    ASSERT_OK(row->SetString("key_STRING", "f"));
+    ASSERT_OK(session->Apply(insert.release()));
+    ASSERT_OK(session->Flush());
+    ASSERT_OK(session->Close());
+  }
+
+  // There is 1 row in table now.
+  ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+  ASSERT_EQ(1, CountTableRows(table.get()));
+
+  // Drop range partition use CLI.
+  s = RunKuduTool({
+    "table",
+    "drop_range_partition",
+    master_addr,
+    kTestTableName,
+    "[10, 11, 12, 13, 14, \"e\", \"f\"]",
+    "[15, 16, 17, 18, 19, \"g\", \"h\"]",
+  }, &stdout, &stderr);
+  ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+
+  // There are 0 rows left.
+  ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+  ASSERT_EQ(0, CountTableRows(table.get()));
+}
+
 } // namespace tools
 } // namespace kudu
diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc
index 6ca7373..0abed54 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -26,9 +26,11 @@
 #include <utility>
 #include <vector>
 
+#include <boost/algorithm/string/predicate.hpp>
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
+#include <glog/logging.h>
 #include <rapidjson/document.h>
 
 #include "kudu/client/client.h"
@@ -38,6 +40,7 @@
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h"
 #include "kudu/client/value.h"
+#include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/map-util.h"
@@ -61,6 +64,7 @@ using kudu::client::KuduScanner;
 using kudu::client::KuduSchema;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableAlterer;
+using kudu::client::KuduTableCreator;
 using kudu::client::internal::ReplicaController;
 using std::cerr;
 using std::cout;
@@ -88,6 +92,12 @@ DEFINE_bool(modify_external_catalogs, true,
 DEFINE_string(config_names, "",
               "Comma-separated list of configurations to display. "
               "An empty value displays all configs.");
+DEFINE_string(lower_bound_type, "INCLUSIVE_BOUND",
+              "The type of the lower bound, either inclusive or exclusive. "
+              "Defaults to inclusive. This flag is case-insensitive.");
+DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND",
+              "The type of the upper bound, either inclusive or exclusive. "
+              "Defaults to exclusive. This flag is case-insensitive.");
 DECLARE_bool(show_values);
 DECLARE_string(tables);
 
@@ -147,6 +157,14 @@ const char* const kNewColumnNameArg = "new_column_name";
 const char* const kKeyArg = "primary_key";
 const char* const kConfigNameArg = "config_name";
 const char* const kConfigValueArg = "config_value";
+const char* const kErrorMsgArg = "unable to parse value $0 for column $1 of type $2";
+const char* const kTableRangeLowerBoundArg = "table_range_lower_bound";
+const char* const kTableRangeUpperBoundArg = "table_range_upper_bound";
+
+enum PartitionAction {
+    ADD,
+    DROP,
+};
 
 Status DeleteTable(const RunnerContext& context) {
   const string& table_name = FindOrDie(context.required_args, kTableNameArg);
@@ -464,6 +482,178 @@ Status GetExtraConfigs(const RunnerContext& context) {
   return data_table.PrintTo(cout);
 }
 
+Status ConvertToKuduPartialRow(const client::sp::shared_ptr<KuduTable>& table,
+                               const string& range_bound_str,
+                               KuduPartialRow* range_bound_partial_row) {
+  JsonReader reader(range_bound_str);
+  RETURN_NOT_OK(reader.Init());
+  vector<const rapidjson::Value *> values;
+  RETURN_NOT_OK(reader.ExtractObjectArray(reader.root(),
+                                          /*field=*/nullptr,
+                                          &values));
+
+  // If range_bound_str is empty, an empty row will be used.
+  if (values.empty()) {
+    return Status::OK();
+  }
+  const Schema& schema = KuduSchema::ToSchema(table->schema());
+  const auto& partition_schema = table->partition_schema();
+  vector<int32_t> key_indexes;
+  RETURN_NOT_OK(partition_schema.GetRangeSchemaColumnIndexes(schema, &key_indexes));
+  if (values.size() != key_indexes.size()) {
+    return Status::InvalidArgument(
+        Substitute("wrong number of range columns specified: expected $0 but received $1",
+                   key_indexes.size(),
+                   values.size()));
+  }
+
+  for (int i = 0; i < values.size(); i++) {
+    const auto key_index = key_indexes[i];
+    const auto& column = table->schema().Column(key_index);
+    const auto& col_name = column.name();
+    const auto type = column.type();
+    const auto error_msg = Substitute(kErrorMsgArg, values[i], col_name,
+        KuduColumnSchema::DataTypeToString(type));
+    switch (type) {
+      case KuduColumnSchema::INT8: {
+        int64_t value;
+        RETURN_NOT_OK_PREPEND(
+            reader.ExtractInt64(values[i], /*field=*/nullptr, &value),
+            error_msg);
+        range_bound_partial_row->SetInt8(col_name, static_cast<int8_t>(value));
+        break;
+      }
+      case KuduColumnSchema::INT16: {
+        int64_t value;
+        RETURN_NOT_OK_PREPEND(
+            reader.ExtractInt64(values[i], /*field=*/nullptr, &value),
+            error_msg);
+        range_bound_partial_row->SetInt16(col_name, static_cast<int16_t>(value));
+        break;
+      }
+      case KuduColumnSchema::INT32: {
+        int32_t value;
+        RETURN_NOT_OK_PREPEND(
+            reader.ExtractInt32(values[i], /*field=*/nullptr, &value),
+            error_msg);
+        range_bound_partial_row->SetInt32(col_name, value);
+        break;
+      }
+      case KuduColumnSchema::INT64: {
+        int64_t value;
+        RETURN_NOT_OK_PREPEND(
+            reader.ExtractInt64(values[i], /*field=*/nullptr, &value),
+            error_msg);
+        range_bound_partial_row->SetInt64(col_name, value);
+        break;
+      }
+      case KuduColumnSchema::UNIXTIME_MICROS: {
+        int64_t value;
+        RETURN_NOT_OK_PREPEND(
+            reader.ExtractInt64(values[i], /*field=*/nullptr, &value),
+            error_msg);
+        range_bound_partial_row->SetUnixTimeMicros(col_name, value);
+        break;
+      }
+      case KuduColumnSchema::BINARY: {
+        string value;
+        RETURN_NOT_OK_PREPEND(
+            reader.ExtractString(values[i], /*field=*/nullptr, &value),
+            error_msg);
+        range_bound_partial_row->SetBinary(col_name, value);
+        break;
+      }
+      case KuduColumnSchema::STRING: {
+        string value;
+        RETURN_NOT_OK_PREPEND(
+            reader.ExtractString(values[i], /*field=*/nullptr, &value),
+            error_msg);
+        range_bound_partial_row->SetString(col_name, value);
+        break;
+      }
+      case KuduColumnSchema::BOOL:
+      case KuduColumnSchema::FLOAT:
+      case KuduColumnSchema::DOUBLE:
+      case KuduColumnSchema::DECIMAL:
+      default:
+        return Status::NotSupported(
+            Substitute("unsupported type $0 for key column '$1': "
+                       "is this tool out of date?",
+                       KuduColumnSchema::DataTypeToString(type),
+                       col_name));
+    }
+  }
+  return Status::OK();
+}
+
+Status ModifyRangePartition(const RunnerContext& context, PartitionAction action) {
+  const string& table_name = FindOrDie(context.required_args, kTableNameArg);
+  const string& table_range_lower_bound = FindOrDie(context.required_args,
+                                                    kTableRangeLowerBoundArg);
+  const string& table_range_upper_bound = FindOrDie(context.required_args,
+                                                    kTableRangeUpperBoundArg);
+
+  const auto convert_bounds_type = [&] (const string& range_bound,
+                                        const string& flags_range_bound_type,
+                                        KuduTableCreator::RangePartitionBound* range_bound_type)
+      -> Status {
+    string inclusive_bound = boost::iequals(flags_range_bound_type, "INCLUSIVE_BOUND") ?
+        "INCLUSIVE_BOUND" : "";
+    string exclusive_bound = boost::iequals(flags_range_bound_type, "EXCLUSIVE_BOUND") ?
+        "EXCLUSIVE_BOUND" : "";
+
+    if (inclusive_bound.empty() && exclusive_bound.empty()) {
+      return Status::InvalidArgument(Substitute(
+          "wrong type of range $0 : only 'INCLUSIVE_BOUND' or "
+          "'EXCLUSIVE_BOUND' can be received", range_bound));
+
+    }
+    *range_bound_type = exclusive_bound.empty() ? KuduTableCreator::INCLUSIVE_BOUND :
+        KuduTableCreator::EXCLUSIVE_BOUND;
+
+    return Status::OK();
+  };
+
+  client::sp::shared_ptr<KuduClient> client;
+  client::sp::shared_ptr<KuduTable> table;
+  RETURN_NOT_OK(CreateKuduClient(context, &client));
+  RETURN_NOT_OK(client->OpenTable(table_name, &table));
+  const auto& schema = table->schema();
+
+  unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+  unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+
+  RETURN_NOT_OK(ConvertToKuduPartialRow(table, table_range_lower_bound, lower_bound.get()));
+  RETURN_NOT_OK(ConvertToKuduPartialRow(table, table_range_upper_bound, upper_bound.get()));
+
+  KuduTableCreator::RangePartitionBound lower_bound_type;
+  KuduTableCreator::RangePartitionBound upper_bound_type;
+
+  RETURN_NOT_OK(convert_bounds_type("lower bound", FLAGS_lower_bound_type, &lower_bound_type));
+  RETURN_NOT_OK(convert_bounds_type("upper bound", FLAGS_upper_bound_type, &upper_bound_type));
+
+  unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
+  if (action == PartitionAction::ADD) {
+    return alterer->AddRangePartition(lower_bound.release(),
+                                      upper_bound.release(),
+                                      lower_bound_type,
+                                      upper_bound_type)->Alter();
+  }
+  DCHECK_EQ(PartitionAction::DROP, action);
+  return alterer->DropRangePartition(lower_bound.release(),
+                                     upper_bound.release(),
+                                     lower_bound_type,
+                                     upper_bound_type)->Alter();
+}
+
+Status DropRangePartition(const RunnerContext& context) {
+  return ModifyRangePartition(context, PartitionAction::DROP);
+}
+
+Status AddRangePartition(const RunnerContext& context) {
+  return ModifyRangePartition(context, PartitionAction::ADD);
+}
+
 } // anonymous namespace
 
 unique_ptr<Mode> BuildTableMode() {
@@ -576,11 +766,46 @@ unique_ptr<Mode> BuildTableMode() {
                               "Name of the table for which to get extra configurations" })
       .AddOptionalParameter("config_names")
       .Build();
+  unique_ptr<Action> drop_range_partition =
+      ActionBuilder("drop_range_partition", &DropRangePartition)
+      .Description("Drop a range partition of table")
+      .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
+      .AddRequiredParameter({ kTableNameArg, "Name of the table" })
+      .AddRequiredParameter({ kTableRangeLowerBoundArg,
+                              "String representation of lower bound of "
+                              "the table range partition as a JSON array" })
+      .AddRequiredParameter({ kTableRangeUpperBoundArg,
+                              "String representation of upper bound of "
+                              "the table range partition as a JSON array" })
+      .AddOptionalParameter("lower_bound_type")
+      .AddOptionalParameter("upper_bound_type")
+      .Build();
+
+  unique_ptr<Action> add_range_partition =
+      ActionBuilder("add_range_partition", &AddRangePartition)
+      .Description("Add a range partition for table")
+      .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
+      .AddRequiredParameter({ kTableNameArg, "Name of the table" })
+      .AddRequiredParameter({ kTableRangeLowerBoundArg,
+                              "String representation of lower bound of "
+                              "the table range partition as a JSON array."
+                              "If the parameter is an empty array, "
+                              "the lower range partition will be unbounded" })
+      .AddRequiredParameter({ kTableRangeUpperBoundArg,
+                              "String representation of upper bound of "
+                              "the table range partition as a JSON array."
+                              "If the parameter is an empty array, "
+                              "the upper range partition will be unbounded" })
+      .AddOptionalParameter("lower_bound_type")
+      .AddOptionalParameter("upper_bound_type")
+      .Build();
 
   return ModeBuilder("table")
       .Description("Operate on Kudu tables")
+      .AddAction(std::move(add_range_partition))
       .AddAction(std::move(delete_table))
       .AddAction(std::move(describe_table))
+      .AddAction(std::move(drop_range_partition))
       .AddAction(std::move(list_tables))
       .AddAction(std::move(locate_row))
       .AddAction(std::move(rename_column))


Mime
View raw message