kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [1/2] kudu git commit: KUDU-1861: add range-partitions to loadgen tables
Date Mon, 25 Jun 2018 19:45:42 GMT
Repository: kudu
Updated Branches:
  refs/heads/master f6e8fe6c6 -> 148a0c7be


KUDU-1861: add range-partitions to loadgen tables

This patch adds the ability to generate a range-partitioned table with
the loadgen tool. The range partitioning schema is designed such that
the non-random write workload will insert sequentially on the primary
key, provided the number of threads is equal to the number of tablets.
This sequential workload per tablet both reduces the number of
compactions and avoids bloom filter lookups.

This patch also renames --table_num_buckets to
--table_num_hash_partitions, which can be combined with
--table_num_range_partitions if desired.

I tested this out on a singler-tserver cluster and verified via the
metrics logs that the number of bloom lookups for a non-random workload
where the number of insert threads and the number of tablets were equal
stayed at zero. When the number of threads was not a factor of the
number of buckets, the number of bloom lookups was non-zero. See
TestNonRandomWorkloadLoadgen in kudu-tool-test.cc for more details.

Change-Id: If4f552a4c73dc82f3b7934082769522557ee5013
Reviewed-on: http://gerrit.cloudera.org:8080/10633
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7192d7ff
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7192d7ff
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7192d7ff

Branch: refs/heads/master
Commit: 7192d7ff8a6d6f5eaa928a976d2f8b4fd1486770
Parents: f6e8fe6
Author: Andrew Wong <awong@cloudera.com>
Authored: Fri Jun 1 16:17:43 2018 -0700
Committer: Andrew Wong <awong@cloudera.com>
Committed: Mon Jun 25 17:15:06 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/kudu-tool-test.cc   | 127 ++++++++++++++++++++++++++++++
 src/kudu/tools/tool_action_perf.cc | 133 ++++++++++++++++++++++++++++----
 2 files changed, 244 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7192d7ff/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 57bc295..9c41392 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -124,6 +124,9 @@
 
 DECLARE_string(block_manager);
 
+METRIC_DECLARE_counter(bloom_lookups);
+METRIC_DECLARE_entity(tablet);
+
 namespace kudu {
 
 namespace tserver {
@@ -143,6 +146,7 @@ using client::KuduTable;
 using client::sp::shared_ptr;
 using cluster::ExternalMiniCluster;
 using cluster::ExternalMiniClusterOptions;
+using cluster::ExternalTabletServer;
 using cluster::InternalMiniCluster;
 using cluster::InternalMiniClusterOptions;
 using consensus::OpId;
@@ -1464,6 +1468,129 @@ TEST_F(ToolTest, TestLoadgenManualFlush) {
       "bench_manual_flush"));
 }
 
+// Run the loadgen, generating a few different partitioning schemas.
+TEST_F(ToolTest, TestLoadgenAutoGenTablePartitioning) {
+  {
+    ExternalMiniClusterOptions opts;
+    opts.num_tablet_servers = 1;
+    NO_FATALS(StartExternalMiniCluster(std::move(opts)));
+  }
+  const vector<string> base_args = {
+    "perf", "loadgen",
+    cluster_->master()->bound_rpc_addr().ToString(),
+    // Use a number of threads that isn't a divisor of the number of partitions
+    // so the insertion bounds of the threads don't align with the bounds of
+    // the partitions. This isn't necessary for the correctness of this test,
+    // but is a bit more unusual and, thus, worth testing. See the comments in
+    // tools/tool_action_perf.cc for more details about this partitioning.
+    "--num_threads=3",
+
+    // Keep the tables so we can verify the presence of tablets as we go.
+    "--keep_auto_table=true",
+
+    // Let's make sure nothing breaks even if we insert across the entire
+    // keyspace. If we didn't use `use_random`, the bounds of inserted data
+    // would be limited by the number of rows inserted.
+    "--use_random",
+
+    // Let's also make sure we get the correct results.
+    "--run_scan",
+  };
+
+  const MonoDelta kTimeout = MonoDelta::FromMilliseconds(10);
+  TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
+
+  // Test with misconfigured partitioning. This should fail because we disallow
+  // creating tables with "no" partitioning.
+  vector<string> args(base_args);
+  args.emplace_back("--table_num_range_partitions=1");
+  args.emplace_back("--table_num_hash_partitions=1");
+  Status s = RunKuduTool(args);
+  ASSERT_FALSE(s.ok());
+
+  // Now let's try running with a couple range partitions.
+  args = base_args;
+  args.emplace_back("--table_num_range_partitions=2");
+  args.emplace_back("--table_num_hash_partitions=1");
+  int expected_tablets = 2;
+  ASSERT_OK(RunKuduTool(args));
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout));
+
+  // Now let's try running with only hash partitions.
+  args = base_args;
+  args.emplace_back("--table_num_range_partitions=1");
+  args.emplace_back("--table_num_hash_partitions=2");
+  ASSERT_OK(RunKuduTool(args));
+  // Note: we're not deleting the tables as we go so that we can do this check.
+  // That also means that we have to take into account the previous tables
+  // created during this test.
+  expected_tablets += 2;
+  ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout));
+
+  // And now with both.
+  args = base_args;
+  args.emplace_back("--table_num_range_partitions=2");
+  args.emplace_back("--table_num_hash_partitions=2");
+  expected_tablets += 4;
+  ASSERT_OK(RunKuduTool(args));
+  ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout));
+}
+
+// Test that a non-random workload results in the behavior we would expect when
+// running against an auto-generated range partitioned table.
+TEST_F(ToolTest, TestNonRandomWorkloadLoadgen) {
+  {
+    ExternalMiniClusterOptions opts;
+    opts.num_tablet_servers = 1;
+    // Flush frequently so there are bloom files to check.
+    //
+    // Note: we use the number of bloom lookups as a loose indicator of whether
+    // writes are sequential or not. If row A is being inserted to a range of
+    // the keyspace that has already been inserted to, the interval tree that
+    // backs the tablet will be unable to say with certainty that row A does or
+    // doesn't already exist, necessitating a bloom lookup. As such, if there
+    // are bloom lookups for a tablet for a given workload, we can say that
+    // that workload is not sequential.
+    opts.extra_tserver_flags = {
+      "--flush_threshold_mb=1",
+      "--flush_threshold_secs=1",
+    };
+    NO_FATALS(StartExternalMiniCluster(std::move(opts)));
+  }
+  const vector<string> base_args = {
+    "perf", "loadgen",
+    cluster_->master()->bound_rpc_addr().ToString(),
+    "--keep_auto_table",
+
+    // Use the same number of threads as partitions so when we range partition,
+    // each thread will be writing to a single tablet.
+    "--num_threads=4",
+
+    // Insert a bunch of large rows for us to begin flushing so there are bloom
+    // files to check.
+    "--num_rows_per_thread=10000",
+    "--string_len=32768",
+
+    // Since we're using such large payloads, flush more frequently so the
+    // client doesn't run out of memory.
+    "--flush_per_n_rows=1",
+  };
+
+  // Partition the table so each thread inserts to a single range.
+  vector<string> args = base_args;
+  args.emplace_back("--table_num_range_partitions=4");
+  args.emplace_back("--table_num_hash_partitions=1");
+  ASSERT_OK(RunKuduTool(args));
+
+  // Check that the insert workload didn't require any bloom lookups.
+  ExternalTabletServer* ts = cluster_->tablet_server(0);
+  int64_t bloom_lookups = 0;
+  ASSERT_OK(itest::GetInt64Metric(ts->bound_http_hostport(),
+      &METRIC_ENTITY_tablet, nullptr, &METRIC_bloom_lookups, "value", &bloom_lookups));
+  ASSERT_EQ(0, bloom_lookups);
+}
+
 // Test 'kudu remote_replica copy' tool when the destination tablet server is online.
 // 1. Test the copy tool when the destination replica is healthy
 // 2. Test the copy tool when the destination replica is tombstoned

http://git-wip-us.apache.org/repos/asf/kudu/blob/7192d7ff/src/kudu/tools/tool_action_perf.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index 2aeb514..ec6aea6 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -104,8 +104,61 @@
 // so for the example above each run increments the sequence number by 10000:
 // 1000 rows per thread * 2 threads * 5 columns
 //
-
-#include "kudu/tools/tool_action.h"
+// Auto-generated tables can be configured to use hash partitioning, range
+// partitioning, or both. Below are a few examples of this.
+//
+//   kudu perf loadgen 127.0.0.1 --num_threads=8 --num_rows_per_thread=1000 \
+//     --table_num_hash_partitions=1 --table_num_range_partitions=8 --use_random=false
+//
+// In the above example, a table with eight range partitions will be created,
+// each partition will be in charge of 1000 rows worth of values; for a
+// three-column table, this means a primary key range width of 3000. Eight
+// inserter threads will be created and will insert rows non-randomly; by
+// design of the range partitioning splits, this means each thread will insert
+// into a single range partition.
+//
+//   kudu perf loadgen 127.0.0.1 --num_threads=8 --num_rows_per_thread=1000 \
+//     --table_num_hash_partitions=8 --table_num_range_partitions=1 --use_random=false
+//
+// In the above example, a table with 8 hash partitions will be created.
+//
+//   kudu perf loadgen 127.0.0.1 --num_threads=8 --num_rows_per_thread=1000 \
+//     --table_num_hash_partitions=8 --table_num_range_partitions=8 --use_random=false
+//
+// In the above example, a table with a total of 64 tablets will be created.
+// The range partitioning splits will be the same as those in the
+// range-partitioning-only example.
+//
+// Below are illustrations of range partitioning and non-random write
+// workloads. The y-axis for both the threads and the tablets is the keyspace,
+// increasing going downwards.
+//
+//   --num_threads=2 --table_num_range_partitions=2 --table_num_hash_partitions=1
+//
+//   Threads sequentially
+//   insert to their keyspaces
+//   in non-random insert mode.
+//      +  +---------+         ^
+//      |  | thread1 | tabletA |  Tablets' range partitions are
+//      |  |         |         |  set to match the desired total
+//      v  +---------+---------+  number of inserted rows for the
+//      |  | thread2 | tabletB |  entire workload, but leaving the
+//      |  |         |         |  outermost tablets unbounded.
+//      v  +---------+         v
+//
+// If the number of tablets is not a multiple of the number of threads when
+// using an auto-generated range-partitioned table, we lose the guarantee
+// that we always write to a monotonically increasing range on each tablet.
+//
+//   --num_threads=2 --table_num_range_partitions=3 --table_num_hash_partitions=1
+//
+//      +  +---------+         ^
+//      |  | thread1 | tabletA |
+//      |  |         +---------+
+//      v  +---------| tabletB |
+//      |  | thread2 +---------+
+//      |  |         | tabletC |
+//      v  +---------+         v
 
 #include <algorithm>
 #include <cstdint>
@@ -122,6 +175,7 @@
 #include <vector>
 
 #include <gflags/gflags.h>
+#include <glog/logging.h>
 
 #include "kudu/client/client.h"
 #include "kudu/client/scan_batch.h"
@@ -137,8 +191,10 @@
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/tools/tool_action.h"
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/util/decimal_util.h"
+#include "kudu/util/flag_validators.h"
 #include "kudu/util/int128.h"
 #include "kudu/util/oid_generator.h"
 #include "kudu/util/random.h"
@@ -245,8 +301,17 @@ DEFINE_string(table_name, "",
               "an already existing table, it's highly recommended to use a "
               "dedicated table created just for testing purposes: "
               "the existing table nor its data is never dropped/deleted.");
-DEFINE_int32(table_num_buckets, 8,
-             "The number of buckets to create when this tool creates a new table.");
+DEFINE_int32(table_num_hash_partitions, 8,
+             "The number of hash partitions to create when this tool creates "
+             "a new table. Note: The total number of partitions must be "
+             "greater than 1.");
+DEFINE_int32(table_num_range_partitions, 1,
+             "The number of range partitions to create when this tool creates "
+             "a new table. A range partitioning schema will be determined to "
+             "evenly split a sequential workload across ranges, leaving "
+             "the outermost ranges unbounded to ensure coverage of the entire "
+             "keyspace. Note: The total number of partitions must be greater "
+             "than 1.");
 DEFINE_int32(table_num_replicas, 1,
              "The number of replicas for the auto-created table; "
              "0 means 'use server-side default'.");
@@ -260,6 +325,19 @@ namespace tools {
 
 namespace {
 
+bool ValidatePartitionFlags() {
+  int num_tablets = FLAGS_table_num_hash_partitions * FLAGS_table_num_range_partitions;
+  if (num_tablets <= 1) {
+    LOG(ERROR) << Substitute("Invalid partitioning: --table_num_hash_partitions=$0
"
+                  "--table_num_range_partitions=$1, must specify more than one partition
"
+                  "for auto-generated tables", FLAGS_table_num_hash_partitions,
+                  FLAGS_table_num_range_partitions);
+    return false;
+  }
+  return true;
+}
+GROUP_FLAG_VALIDATOR(partition_flags, &ValidatePartitionFlags);
+
 class Generator {
  public:
   enum Mode {
@@ -320,6 +398,15 @@ string Generator::Next() {
   return buf_;
 }
 
+// Utility function that determines the range of generated values each thread
+// should insert across if inserting in non-random mode. In random mode, this
+// is used to generate different RNG seeds per thread.
+int64_t SpanPerThread(int num_columns) {
+  return (FLAGS_num_rows_per_thread == 0) ?
+      numeric_limits<int64_t>::max() / FLAGS_num_threads
+      : FLAGS_num_rows_per_thread * num_columns;
+}
+
 Status GenerateRowData(Generator* gen, KuduPartialRow* row,
                        const string& fixed_string) {
   const vector<ColumnSchema>& columns(row->schema()->columns());
@@ -387,8 +474,7 @@ mutex cerr_lock;
 
 void GeneratorThread(
     const shared_ptr<KuduClient>& client, const string& table_name,
-    size_t gen_idx, size_t gen_num,
-    Status* status, uint64_t* row_count, uint64_t* err_count) {
+    size_t gen_idx, Status* status, uint64_t* row_count, uint64_t* err_count) {
 
   const Generator::Mode gen_mode = FLAGS_use_random ? Generator::MODE_RAND
                                                     : Generator::MODE_SEQ;
@@ -415,9 +501,7 @@ void GeneratorThread(
 
     // Planning for non-intersecting ranges for different generator threads
     // in sequential generation mode.
-    const int64_t gen_span =
-        (num_rows_per_gen == 0) ? numeric_limits<int64_t>::max() / gen_num
-                                : num_rows_per_gen * num_columns;
+    const int64_t gen_span = SpanPerThread(num_columns);
     const int64_t gen_seed = gen_idx * gen_span + gen_seq_start;
     Generator gen(gen_mode, gen_seed, FLAGS_string_len);
     for (; num_rows_per_gen == 0 || idx < num_rows_per_gen; ++idx) {
@@ -467,7 +551,7 @@ Status GenerateInsertRows(const shared_ptr<KuduClient>& client,
   vector<uint64_t> err_count(gen_num, 0);
   vector<thread> threads;
   for (size_t i = 0; i < gen_num; ++i) {
-    threads.emplace_back(&GeneratorThread, client, table_name, i, gen_num,
+    threads.emplace_back(&GeneratorThread, client, table_name, i,
                          &status[i], &row_count[i], &err_count[i]);
   }
   for (auto& t : threads) {
@@ -579,13 +663,29 @@ Status TestLoadGenerator(const RunnerContext& context) {
     RETURN_NOT_OK(b.Build(&schema));
 
     unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
-    RETURN_NOT_OK(table_creator->table_name(table_name)
+    table_creator->table_name(table_name)
                   .schema(&schema)
                   .num_replicas(FLAGS_table_num_replicas)
-                  .add_hash_partitions(vector<string>({ kKeyColumnName }),
-                                       FLAGS_table_num_buckets)
-                  .wait(true)
-                  .Create());
+                  .wait(true);
+    if (FLAGS_table_num_range_partitions > 1) {
+      // Split the generated span for a sequential workload evenly across all
+      // tablets. In case we're inserting in random mode, use unbounded range
+      // partitioning, so the table has key coverage of the entire keyspace.
+      const int64_t total_inserted_span = SpanPerThread(schema.num_columns()) * FLAGS_num_threads;
+      const int64_t span_per_range = total_inserted_span / FLAGS_table_num_range_partitions;
+      table_creator->set_range_partition_columns({ kKeyColumnName });
+      for (int i = 1; i < FLAGS_table_num_range_partitions; i++) {
+        unique_ptr<KuduPartialRow> split(schema.NewRow());
+        int64_t split_val = FLAGS_seq_start + i * span_per_range;
+        RETURN_NOT_OK(split->SetInt64(kKeyColumnName, split_val));
+        table_creator->add_range_partition_split(split.release());
+      }
+    }
+    if (FLAGS_table_num_hash_partitions > 1) {
+      table_creator->add_hash_partitions(
+          vector<string>({ kKeyColumnName }), FLAGS_table_num_hash_partitions);
+    }
+    RETURN_NOT_OK(table_creator->Create());
   }
   cout << "Using " << (is_auto_table ? "auto-created " : "")
        << "table '" << table_name << "'" << endl;
@@ -670,7 +770,8 @@ unique_ptr<Mode> BuildPerfMode() {
       .AddOptionalParameter("string_fixed")
       .AddOptionalParameter("string_len")
       .AddOptionalParameter("table_name")
-      .AddOptionalParameter("table_num_buckets")
+      .AddOptionalParameter("table_num_hash_partitions")
+      .AddOptionalParameter("table_num_range_partitions")
       .AddOptionalParameter("table_num_replicas")
       .AddOptionalParameter("use_random")
       .Build();


Mime
View raw message