kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [2/2] kudu git commit: [tools] introduced '--seq_start' flag
Date Sun, 09 Oct 2016 21:07:16 GMT
[tools] introduced '--seq_start' flag

Introduced '--seq_start' flag for 'kudu test loadgen' tool.
Use this new flag to specify initial value for generator in sequential
mode.  The new flag facilitates running the tool against already
existing table multiple times.

Here is an example: an already existing table t3 has 5 columns.
The sequence of 3 runs below inserts 6000 rows in total
(3 runs * 1000 rows per thread * 2 threads)
with no duplicate values across all columns:

  kudu test loadgen 127.0.0.1 --table_name=t3 --num_threads=2 \
    --num_rows_per_thread=1000 --seq_start=0

  kudu test loadgen 127.0.0.1 --table_name=t3 --num_threads=2 \
    --num_rows_per_thread=1000 --seq_start=10000

  kudu test loadgen 127.0.0.1 --table_name=t3 --num_threads=2 \
    --num_rows_per_thread=1000 --seq_start=20000

Also, introduced '--table_num_replicas' flag
to specify number of replicas for auto-created tables.

Renamed '--num_buckets' into '--table_num_buckets'.

Besides, do not abort loadgen tool if generator thread encounters
an error.  Instead, stop generating data and gracefully join.

Change-Id: I0899168b31d4e3a2d53fff7db669a0840452f557
Reviewed-on: http://gerrit.cloudera.org:8080/4672
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Alexey Serbin <aserbin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ac519fde
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ac519fde
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ac519fde

Branch: refs/heads/master
Commit: ac519fde8a2e1f1097346416518c57dcd2b9238c
Parents: 5233e49
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Fri Oct 7 21:52:03 2016 -0700
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Sat Oct 8 08:34:03 2016 +0000

----------------------------------------------------------------------
 src/kudu/tools/tool_action_test.cc | 200 ++++++++++++++++++++------------
 1 file changed, 124 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ac519fde/src/kudu/tools/tool_action_test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_test.cc b/src/kudu/tools/tool_action_test.cc
index 5166025..360c677 100644
--- a/src/kudu/tools/tool_action_test.cc
+++ b/src/kudu/tools/tool_action_test.cc
@@ -91,9 +91,12 @@
 #include <ctime>
 
 #include <algorithm>
+#include <chrono>
+#include <iomanip>
 #include <iostream>
 #include <limits>
 #include <memory>
+#include <mutex>
 #include <sstream>
 #include <thread>
 #include <vector>
@@ -110,17 +113,6 @@
 #include "kudu/util/random.h"
 #include "kudu/util/stopwatch.h"
 
-using std::accumulate;
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::numeric_limits;
-using std::string;
-using std::ostringstream;
-using std::thread;
-using std::vector;
-using std::unique_ptr;
-
 using kudu::ColumnSchema;
 using kudu::KuduPartialRow;
 using kudu::Stopwatch;
@@ -139,6 +131,20 @@ using kudu::client::KuduSession;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
 using kudu::client::sp::shared_ptr;
+using std::accumulate;
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::lock_guard;
+using std::mutex;
+using std::numeric_limits;
+using std::ostringstream;
+using std::string;
+using std::thread;
+using std::vector;
+using std::unique_ptr;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
 
 DEFINE_double(buffer_flush_watermark_pct, 0.5,
               "Mutation buffer flush watermark, in percentage of total size.");
@@ -169,6 +175,11 @@ DEFINE_bool(run_scan, false,
             "the inserted rows matches the expected number. If enabled, "
             "the scan is run only if no errors were encountered "
             "while inserting the generated rows.");
+DEFINE_uint64(seq_start, 0,
+              "Initial value for the generator in sequential mode. "
+              "This is useful when running multiple times against already "
+              "existing table: for every next run, set this flag to "
+              "(num_threads * num_rows_per_thread * column_num + seq_start).");
 DEFINE_int32(show_first_n_errors, 0,
              "Output detailed information on the specified number of "
              "first n errors (if any).");
@@ -194,9 +205,11 @@ DEFINE_string(table_name, "",
               "an already existing table, it's highly recommended to use a "
               "dedicated table created just for testing purposes: "
               "the existing table nor its data is never dropped/deleted.");
-DEFINE_int32(num_buckets, 8,
+DEFINE_int32(table_num_buckets, 8,
              "The number of buckets to create when this tool creates a new table.");
-
+DEFINE_int32(table_num_replicas, 1,
+             "The number of replicas for the auto-created table; "
+             "0 means 'use server-side default'.");
 DEFINE_bool(use_random, false,
             "Whether to use random numbers instead of sequential ones. "
             "In case of using random numbers collisions are possible over "
@@ -321,38 +334,57 @@ Status GenerateRowData(Generator* gen, KuduPartialRow* row,
   return Status::OK();
 }
 
+mutex cerr_lock;
+
 void GeneratorThread(
     const shared_ptr<KuduClient>& client, const string& table_name,
-    Generator::Mode gen_mode, int64_t gen_seed,
-    uint64_t* row_count, uint64_t* err_count) {
+    size_t gen_idx, size_t gen_num,
+    Status* status, uint64_t* row_count, uint64_t* err_count) {
 
+  const Generator::Mode gen_mode = FLAGS_use_random ? Generator::MODE_RAND
+                                                    : Generator::MODE_SEQ;
   const size_t flush_per_n_rows = FLAGS_flush_per_n_rows;
-
+  const uint64_t gen_seq_start = FLAGS_seq_start;
   shared_ptr<KuduSession> session(client->NewSession());
-  CHECK_OK(session->SetMutationBufferFlushWatermark(
-      FLAGS_buffer_flush_watermark_pct));
-  CHECK_OK(session->SetMutationBufferSpace(FLAGS_buffer_size_bytes));
-  CHECK_OK(session->SetMutationBufferMaxNum(FLAGS_buffers_num));
-  CHECK_OK(session->SetFlushMode(
-      flush_per_n_rows == 0 ? KuduSession::AUTO_FLUSH_BACKGROUND
-                            : KuduSession::MANUAL_FLUSH));
-
-  shared_ptr<KuduTable> table;
-  CHECK_OK(client->OpenTable(table_name, &table));
-
-  Generator gen(gen_mode, gen_seed, FLAGS_string_len);
   uint64_t idx = 0;
-  const size_t num_rows_per_thread = FLAGS_num_rows_per_thread;
-  for (; num_rows_per_thread == 0 || idx < num_rows_per_thread;
-       ++idx) {
-    unique_ptr<KuduInsert> insert_op(table->NewInsert());
-    CHECK_OK(GenerateRowData(&gen, insert_op->mutable_row(), FLAGS_string_fixed));
-    CHECK_OK(session->Apply(insert_op.release()));
-    if (flush_per_n_rows != 0 && idx != 0 && idx % flush_per_n_rows == 0)
{
-      session->FlushAsync(nullptr);
+
+  auto generator = [&]() {
+    RETURN_NOT_OK(session->SetMutationBufferFlushWatermark(
+                     FLAGS_buffer_flush_watermark_pct));
+    RETURN_NOT_OK(session->SetMutationBufferSpace(
+                     FLAGS_buffer_size_bytes));
+    RETURN_NOT_OK(session->SetMutationBufferMaxNum(FLAGS_buffers_num));
+    RETURN_NOT_OK(session->SetFlushMode(
+        flush_per_n_rows == 0 ? KuduSession::AUTO_FLUSH_BACKGROUND
+                              : KuduSession::MANUAL_FLUSH));
+    const size_t num_rows_per_gen = FLAGS_num_rows_per_thread;
+
+    shared_ptr<KuduTable> table;
+    RETURN_NOT_OK(client->OpenTable(table_name, &table));
+    const size_t num_columns = table->schema().num_columns();
+
+    // Planning for non-intersecting ranges for different generator threads
+    // in sequential generation mode.
+    const int64_t gen_span =
+        (num_rows_per_gen == 0) ? numeric_limits<int64_t>::max() / gen_num
+                                : num_rows_per_gen * num_columns;
+    const int64_t gen_seed = gen_idx * gen_span + gen_seq_start;
+    Generator gen(gen_mode, gen_seed, FLAGS_string_len);
+    for (; num_rows_per_gen == 0 || idx < num_rows_per_gen; ++idx) {
+      unique_ptr<KuduInsert> insert_op(table->NewInsert());
+      RETURN_NOT_OK(GenerateRowData(&gen, insert_op->mutable_row(),
+                                   FLAGS_string_fixed));
+      RETURN_NOT_OK(session->Apply(insert_op.release()));
+      if (flush_per_n_rows != 0 && idx != 0 && idx % flush_per_n_rows ==
0) {
+        session->FlushAsync(nullptr);
+      }
     }
-  }
-  CHECK_OK(session->Flush());
+    RETURN_NOT_OK(session->Flush());
+
+    return Status::OK();
+  };
+
+  *status = generator();
   if (row_count != nullptr) {
     *row_count = idx;
   }
@@ -362,27 +394,31 @@ void GeneratorThread(
   if (err_count != nullptr) {
     *err_count = errors.size();
   }
-  for (size_t i = 0; i < errors.size() && i < FLAGS_show_first_n_errors; ++i)
{
-    cerr << errors[i]->status().ToString() << endl;
+  if (!errors.empty() && FLAGS_show_first_n_errors > 0) {
+    ostringstream str;
+    str << "Error from generator " << std::setw(4) << gen_idx <<
":" << endl;
+    for (size_t i = 0; i < errors.size() && i < FLAGS_show_first_n_errors;
++i) {
+      str << "  " << errors[i]->status().ToString() << endl;
+    }
+    // Serialize access to the stderr to prevent garbled output.
+    lock_guard<mutex> lock(cerr_lock);
+    cerr << str.str() << endl;
   }
 }
 
-void GenerateInsertRows(const shared_ptr<KuduClient>& client,
-                        const string& table_name,
-                        uint64_t* total_row_count, uint64_t* total_err_count) {
+Status GenerateInsertRows(const shared_ptr<KuduClient>& client,
+                          const string& table_name,
+                          uint64_t* total_row_count,
+                          uint64_t* total_err_count) {
 
-  const size_t num_threads = FLAGS_num_threads;
-  const Generator::Mode generator_mode = FLAGS_use_random ? Generator::MODE_RAND
-                                                          : Generator::MODE_SEQ;
-  vector<uint64_t> row_count(num_threads, 0);
-  vector<uint64_t> err_count(num_threads, 0);
+  const size_t gen_num = FLAGS_num_threads;
+  vector<Status> status(gen_num);
+  vector<uint64_t> row_count(gen_num, 0);
+  vector<uint64_t> err_count(gen_num, 0);
   vector<thread> threads;
-  // The 'seed span' allows to have non-intersecting ranges for column values
-  // in sequential generation mode.
-  const int64_t seed_span = numeric_limits<int64_t>::max() / num_threads;
-  for (size_t i = 0; i < num_threads; ++i) {
-    threads.emplace_back(&GeneratorThread, client, table_name, generator_mode,
-                         seed_span * i, &row_count[i], &err_count[i]);
+  for (size_t i = 0; i < gen_num; ++i) {
+    threads.emplace_back(&GeneratorThread, client, table_name, i, gen_num,
+                         &status[i], &row_count[i], &err_count[i]);
   }
   for (auto& t : threads) {
     t.join();
@@ -393,6 +429,13 @@ void GenerateInsertRows(const shared_ptr<KuduClient>& client,
   if (total_err_count != nullptr) {
     *total_err_count = accumulate(err_count.begin(), err_count.end(), 0UL);
   }
+  // Return first non-OK error status, if any, as a result.
+  const auto it = find_if(status.begin(), status.end(),
+                          [&](const Status& s) { return !s.ok(); });
+  if (it != status.end()) {
+    return *it;
+  }
+  return Status::OK();
 }
 
 // Fetch all rows from the table with the specified name; iterate over them
@@ -473,13 +516,6 @@ Status TestLoadGenerator(const RunnerContext& context) {
     table_name = FLAGS_table_name;
   } else {
     static const string kKeyColumnName = "key";
-    static const Schema kSchema = Schema(
-        {
-          ColumnSchema(kKeyColumnName, INT64),
-          ColumnSchema("int32_val", INT32),
-          ColumnSchema("string_val", STRING),
-          ColumnSchema("binary_val", BINARY),
-        }, 1);
 
     // The auto-created table case.
     is_auto_table = true;
@@ -487,7 +523,7 @@ Status TestLoadGenerator(const RunnerContext& context) {
     table_name = "loadgen_auto_" + oid_generator.Next();
     KuduSchema schema;
     KuduSchemaBuilder b;
-    b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+    b.AddColumn(kKeyColumnName)->Type(KuduColumnSchema::INT64)->NotNull()->PrimaryKey();
     b.AddColumn("int_val")->Type(KuduColumnSchema::INT32);
     b.AddColumn("string_val")->Type(KuduColumnSchema::STRING);
     RETURN_NOT_OK(b.Build(&schema));
@@ -495,8 +531,9 @@ Status TestLoadGenerator(const RunnerContext& context) {
     unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
     RETURN_NOT_OK(table_creator->table_name(table_name)
                   .schema(&schema)
-                  .num_replicas(1)
-                  .add_hash_partitions(vector<string>({ kKeyColumnName }), FLAGS_num_buckets)
+                  .num_replicas(FLAGS_table_num_replicas)
+                  .add_hash_partitions(vector<string>({ kKeyColumnName }),
+                                       FLAGS_table_num_buckets)
                   .wait(true)
                   .Create());
   }
@@ -507,7 +544,8 @@ Status TestLoadGenerator(const RunnerContext& context) {
   uint64_t total_err_count = 0;
   Stopwatch sw(Stopwatch::ALL_THREADS);
   sw.start();
-  GenerateInsertRows(client, table_name, &total_row_count, &total_err_count);
+  Status status = GenerateInsertRows(client, table_name,
+                                     &total_row_count, &total_err_count);
   sw.stop();
   const double total = sw.elapsed().wall_millis();
   cout << endl << "Generator report" << endl
@@ -515,9 +553,19 @@ Status TestLoadGenerator(const RunnerContext& context) {
   if (total_row_count != 0) {
     cout << "  time per row: " << total / total_row_count << " ms" <<
endl;
   }
-  if (total_err_count != 0) {
-    return Status::RuntimeError(strings::Substitute("Encountered $0 errors",
-                                                    total_err_count));
+  if (!status.ok() || total_err_count != 0) {
+    string err_str;
+    if (!status.ok()) {
+      SubstituteAndAppend(&err_str, status.ToString());
+    }
+    if (total_err_count != 0) {
+      if (!status.ok()) {
+        SubstituteAndAppend(&err_str,  "; ");
+      }
+      SubstituteAndAppend(&err_str, "Encountered $0 write operation errors",
+                          total_err_count);
+    }
+    return Status::RuntimeError(err_str);
   }
 
   if (FLAGS_run_scan) {
@@ -529,8 +577,8 @@ Status TestLoadGenerator(const RunnerContext& context) {
          << "  actual rows  : " << count << endl;
     if (count != total_row_count) {
       return Status::RuntimeError(
-            strings::Substitute("Row count mismatch: expected $0, actual $1",
-                                total_row_count, count));
+            Substitute("Row count mismatch: expected $0, actual $1",
+                       total_row_count, count));
     }
   }
 
@@ -551,11 +599,9 @@ unique_ptr<Mode> BuildTestMode() {
       .Description("Run load generation test with optional scan afterwards")
       .ExtraDescription(
           "Run load generation tool which inserts auto-generated data "
-          "into already existing table as fast as possible. If requested, "
-          "also run scan over the inserted rows to check whether the reported "
-          "count or inserted rows matches with the expected one. "
-          "NOTE: it's highly recommended to create a separate table for that "
-          "because the tool does not clean the inserted data.")
+          "into already existing or auto-created table as fast as possible. "
+          "If requested, also run scan over the inserted rows to check whether "
+          "the actual count or inserted rows matches the expected one.")
       .AddRequiredParameter({ kMasterAddressesArg,
           "Comma-separated list of master addresses to run against. "
           "Addresses are in 'hostname:port' form where port may be omitted "
@@ -568,10 +614,12 @@ unique_ptr<Mode> BuildTestMode() {
       .AddOptionalParameter("num_rows_per_thread")
       .AddOptionalParameter("num_threads")
       .AddOptionalParameter("run_scan")
+      .AddOptionalParameter("seq_start")
       .AddOptionalParameter("string_fixed")
       .AddOptionalParameter("string_len")
       .AddOptionalParameter("table_name")
-      .AddOptionalParameter("num_buckets")
+      .AddOptionalParameter("table_num_buckets")
+      .AddOptionalParameter("table_num_replicas")
       .AddOptionalParameter("use_random")
       .Build();
 


Mime
View raw message