kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wdberke...@apache.org
Subject [kudu] 01/02: [tools] Add table scan tool
Date Tue, 29 Jan 2019 17:45:08 GMT
This is an automated email from the ASF dual-hosted git repository.

wdberkeley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0afeddf9e530762e0e47beb7428982763715c746
Author: Yingchun Lai <405403881@qq.com>
AuthorDate: Sat Jan 5 03:41:50 2019 -0500

    [tools] Add table scan tool
    
    This commit adds a basic tool to scan rows from a table. Several
    predicates can specified on the query. Unlike traditional SQL
    syntax, the scan tool's simple query predicates are represented in a
    simple JSON syntax. Three types of predicates are supported, including
    'Comparison', 'InList' and 'IsNull'.
      * The 'Comparison' type support <=, <, ==, > and >=,
        which can be represented as '[operator, column_name, value]',
        e.g. '[">=", "col1", "value"]'
      * The 'InList' type can be represented as
        '["IN", column_name, [value1, value2, ...]]'
        e.g. '["IN", "col2", ["value1", "value2"]]'
      * The 'IsNull' type determine whether the value is NULL or not,
        which can be represented as '[operator, column_name]'
        e.g. '["NULL", "col1"]', or '["NOTNULL", "col2"]'
    Predicates can be combined together with predicate operators using the syntax
    [operator, predicate, predicate, ..., predicate].
    For example,
    ["AND", [">=", "col1", "value"], ["NOTNULL", "col2"]]
    The only supported predicate operator is `AND`.
    
    Change-Id: Ieac340b70a9eaf131f82a2b7d61336211d1d48f8
    Reviewed-on: http://gerrit.cloudera.org:8080/12167
    Tested-by: Kudu Jenkins
    Reviewed-by: Will Berkeley <wdberkeley@gmail.com>
---
 src/kudu/client/scanner-internal.h    |   3 +
 src/kudu/tools/CMakeLists.txt         |   1 +
 src/kudu/tools/kudu-tool-test.cc      | 185 +++++++++++++++++
 src/kudu/tools/table_scanner.cc       | 380 ++++++++++++++++++++++++++++++++++
 src/kudu/tools/table_scanner.h        |  67 ++++++
 src/kudu/tools/tool_action_cluster.cc |   4 +-
 src/kudu/tools/tool_action_common.cc  |   6 +
 src/kudu/tools/tool_action_perf.cc    |   4 +-
 src/kudu/tools/tool_action_table.cc   |  73 +++++--
 9 files changed, 694 insertions(+), 29 deletions(-)

diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 5e7652b..7b233bf 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -311,6 +311,9 @@ class KuduScanBatch::Data {
         << row_format_flags_;
     DCHECK_GE(idx, 0);
     DCHECK_LT(idx, num_rows());
+    if (direct_data_.empty()) {
+      return KuduRowResult(projection_, nullptr);
+    }
     int offset = idx * projected_row_size_;
     return KuduRowResult(projection_, &direct_data_[offset]);
   }
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index 1be02ca..ebe894c 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -42,6 +42,7 @@ add_library(kudu_tools_util
   color.cc
   data_gen_util.cc
   diagnostics_log_parser.cc
+  table_scanner.cc
   tool_action.cc
   tool_action_common.cc
 )
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index b8fd9d6..1c0633f 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -47,6 +47,7 @@
 #include "kudu/client/client.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h"
+#include "kudu/client/write_op.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
@@ -140,10 +141,12 @@ using kudu::cfile::StringDataGenerator;
 using kudu::cfile::WriterOptions;
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
+using kudu::client::KuduInsert;
 using kudu::client::KuduScanToken;
 using kudu::client::KuduScanTokenBuilder;
 using kudu::client::KuduSchema;
 using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduSession;
 using kudu::client::KuduTable;
 using kudu::client::sp::shared_ptr;
 using kudu::cluster::ExternalMiniCluster;
@@ -181,9 +184,11 @@ using std::back_inserter;
 using std::copy;
 using std::make_pair;
 using std::map;
+using std::max;
 using std::ostringstream;
 using std::pair;
 using std::string;
+using std::to_string;
 using std::unique_ptr;
 using std::unordered_map;
 using std::unordered_set;
@@ -378,6 +383,41 @@ class ToolTest : public KuduTest {
     return Status::OK();
   }
 
+  void RunScanTableCheck(const string& table_name,
+                         const string& predicates_json,
+                         int64_t min_value,
+                         int64_t max_value,
+                         const vector<pair<string, string>>& columns = {{"int32",
"key"}}) {
+    vector<string> col_names;
+    for (const auto& column : columns) {
+      col_names.push_back(column.second);
+    }
+    const string projection = JoinStrings(col_names, ",");
+
+    vector<string> lines;
+    int64_t total = max(max_value - min_value + 1, 0L);
+    NO_FATALS(RunActionStdoutLines(
+                Substitute("table scan $0 $1 -show_value=true "
+                           "-columns=$2 -predicates=$3",
+                           cluster_->master()->bound_rpc_addr().ToString(),
+                           table_name, projection, predicates_json), &lines));
+    for (int64_t value = min_value; value <= max_value; ++value) {
+      // Check projection.
+      vector<string> kvs;
+      for (const auto& column : columns) {
+        // Check matched rows.
+        kvs.push_back(Substitute("$0 $1=$2",
+            column.first, column.second, column.second == "key" ? to_string(value) : ".*"));
+      }
+      string line_pattern(R"*(\()*");
+      line_pattern += JoinStrings(kvs, ", ");
+      line_pattern += (")");
+      ASSERT_STRINGS_ANY_MATCH(lines, line_pattern);
+    }
+    // Check total count.
+    ASSERT_STRINGS_ANY_MATCH(lines, Substitute("Total count $0 ", total));
+  }
+
  protected:
   void RunLoadgen(int num_tservers = 1,
                   const vector<string>& tool_args = {},
@@ -589,6 +629,7 @@ TEST_F(ToolTest, TestModeHelp) {
         "rename_table.*Rename a table",
         "rename_column.*Rename a column",
         "list.*List tables",
+        "scan.*Scan rows from a table",
     };
     NO_FATALS(RunTestHelp("table", kTableModeRegexes));
   }
@@ -2130,6 +2171,7 @@ TEST_F(ToolTest, TestMasterList) {
 // (2)rename a table
 // (3)rename a column
 // (4)list tables
+// (5)scan a table
 TEST_F(ToolTest, TestDeleteTable) {
   NO_FATALS(StartExternalMiniCluster());
   shared_ptr<KuduClient> client;
@@ -2311,6 +2353,149 @@ TEST_F(ToolTest, TestListTables) {
   }
 }
 
+TEST_F(ToolTest, TestScanTablePredicates) {
+  NO_FATALS(StartExternalMiniCluster());
+  string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+
+  const string kTableName = "kudu.table.scan.predicates";
+
+  // Create the src table and write some data to it.
+  TestWorkload ww(cluster_.get());
+  ww.set_table_name(kTableName);
+  ww.set_num_replicas(1);
+  ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+  ww.set_num_write_threads(1);
+  ww.Setup();
+  ww.Start();
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_GE(ww.rows_inserted(), 10);
+  });
+  ww.StopAndJoin();
+  int64_t total_rows = ww.rows_inserted();
+
+  // Insert one more row with a NULL value column.
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+  shared_ptr<KuduSession> session = client->NewSession();
+  session->SetTimeoutMillis(20000);
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client->OpenTable(kTableName, &table));
+  unique_ptr<KuduInsert> insert(table->NewInsert());
+  ASSERT_OK(insert->mutable_row()->SetInt32("key", ++total_rows));
+  ASSERT_OK(insert->mutable_row()->SetInt32("int_val", 1));
+  ASSERT_OK(session->Apply(insert.release()));
+  ASSERT_OK(session->Flush());
+
+  // Check predicates.
+  RunScanTableCheck(kTableName, "", 1, total_rows);
+  RunScanTableCheck(kTableName, R"*(["AND",["=","key",1]])*", 1, 1);
+  int64_t mid = total_rows / 2;
+  RunScanTableCheck(kTableName,
+                    Substitute(R"*(["AND",[">","key",$0]])*", mid),
+                    mid + 1, total_rows);
+  RunScanTableCheck(kTableName,
+                    Substitute(R"*(["AND",[">=","key",$0]])*", mid),
+                    mid, total_rows);
+  RunScanTableCheck(kTableName,
+                    Substitute(R"*(["AND",["<","key",$0]])*", mid),
+                    1, mid - 1);
+  RunScanTableCheck(kTableName,
+                    Substitute(R"*(["AND",["<=","key",$0]])*", mid),
+                    1, mid);
+  RunScanTableCheck(kTableName,
+                    R"*(["AND",["IN","key",[1,2,3,4,5]]])*",
+                    1, 5);
+  RunScanTableCheck(kTableName,
+                    R"*(["AND",["NOTNULL","string_val"]])*",
+                    1, total_rows - 1);
+  RunScanTableCheck(kTableName,
+                    R"*(["AND",["NULL","string_val"]])*",
+                    total_rows, total_rows);
+  RunScanTableCheck(kTableName,
+                    R"*(["AND",["IN","key",[0,1,2,3]],)*"
+                    R"*(["<","key",8],[">=","key",1],["NOTNULL","key"],)*"
+                    R"*(["NOTNULL","string_val"]])*",
+                    1, 3);
+}
+
+TEST_F(ToolTest, TestScanTableProjection) {
+  NO_FATALS(StartExternalMiniCluster());
+  string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+
+  const string kTableName = "kudu.table.scan.projection";
+
+  // Create the src table and write some data to it.
+  TestWorkload ww(cluster_.get());
+  ww.set_table_name(kTableName);
+  ww.set_num_replicas(1);
+  ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+  ww.set_num_write_threads(1);
+  ww.Setup();
+  ww.Start();
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_GE(ww.rows_inserted(), 10);
+  });
+  ww.StopAndJoin();
+
+  // Check projections.
+  string one_row_json = R"*(["AND",["=","key",1]])*";
+  RunScanTableCheck(kTableName, one_row_json, 1, 1, {});
+  RunScanTableCheck(kTableName, one_row_json, 1, 1, {{"int32", "key"}});
+  RunScanTableCheck(kTableName, one_row_json, 1, 1, {{"string", "string_val"}});
+  RunScanTableCheck(kTableName, one_row_json, 1, 1, {{"int32", "key"},
+                                                     {"string", "string_val"}});
+  RunScanTableCheck(kTableName, one_row_json, 1, 1, {{"int32", "key"},
+                                                     {"int32", "int_val"},
+                                                     {"string", "string_val"}});
+}
+
+TEST_F(ToolTest, TestScanTableMultiPredicates) {
+  NO_FATALS(StartExternalMiniCluster());
+  string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+
+  const string kTableName = "kudu.table.scan.multipredicates";
+
+  // Create the src table and write some data to it.
+  TestWorkload ww(cluster_.get());
+  ww.set_table_name(kTableName);
+  ww.set_num_replicas(1);
+  ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+  ww.set_num_write_threads(1);
+  ww.Setup();
+  ww.Start();
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_GE(ww.rows_inserted(), 1000);
+  });
+  ww.StopAndJoin();
+  int64_t total_rows = ww.rows_inserted();
+  int64_t mid = total_rows / 2;
+
+  vector<string> lines;
+  NO_FATALS(RunActionStdoutLines(
+              Substitute("table scan $0 $1 -show_value=true "
+                         "-columns=key,string_val -predicates=$2",
+                         cluster_->master()->bound_rpc_addr().ToString(),
+                         kTableName,
+                         Substitute(R"*(["AND",[">","key",$0],)*"
+                                    R"*(["<=","key",$1],)*"
+                                    R"*([">=","string_val","a"],)*"
+                                    R"*(["<","string_val","b"]])*", mid, total_rows)),
+                         &lines));
+  for (auto line : lines) {
+    size_t pos1 = line.find("(int64 key=");
+    if (pos1 != string::npos) {
+      size_t pos2 = line.find(", string string_val=a", pos1);
+      ASSERT_NE(pos2, string::npos);
+      int32_t key;
+      ASSERT_TRUE(safe_strto32(line.substr(pos1, pos2).c_str(), &key));
+      ASSERT_GT(key, mid);
+      ASSERT_LE(key, total_rows);
+    }
+  }
+  ASSERT_LE(lines.size(), mid);
+}
+
 Status CreateLegacyHmsTable(HmsClient* client,
                             const string& hms_database_name,
                             const string& hms_table_name,
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
new file mode 100644
index 0000000..a2f2e7b
--- /dev/null
+++ b/src/kudu/tools/table_scanner.cc
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/table_scanner.h"
+
+#include <stddef.h>
+
+#include <iostream>
+#include <map>
+#include <memory>
+#include <set>
+
+#include <boost/bind.hpp>
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <rapidjson/document.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/scan_batch.h"
+#include "kudu/client/scan_predicate.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/value.h"
+#include "kudu/common/column_predicate.h"
+#include "kudu/common/schema.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/string_case.h"
+
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduPredicate;
+using kudu::client::KuduScanBatch;
+using kudu::client::KuduScanner;
+using kudu::client::KuduScanTokenBuilder;
+using kudu::client::KuduSchema;
+using kudu::client::KuduTable;
+using kudu::client::KuduValue;
+using strings::Substitute;
+using std::cout;
+using std::endl;
+using std::map;
+using std::set;
+using std::unique_ptr;
+
+DECLARE_string(columns);
+DEFINE_bool(fill_cache, true,
+            "Whether to fill block cache when scanning.");
+DECLARE_int32(num_threads);
+
+DEFINE_string(predicates, "",
+              "Query predicates on columns. Unlike traditional SQL syntax, "
+              "the scan tool's simple query predicates are represented in a "
+              "simple JSON syntax. Three types of predicates are supported, "
+              "including 'Comparison', 'InList' and 'IsNull'.\n"
+              " * The 'Comparison' type support <=, <, ==, > and >=,\n"
+              "   which can be represented as '[operator, column_name, value]',""\n"
+              R"*(   e.g. '[">=", "col1", "value"]')*""\n"
+              " * The 'InList' type can be represented as\n"
+              R"*(   '["IN", column_name, [value1, value2, ...]]')*""\n"
+              R"*(   e.g. '["IN", "col2", ["value1", "value2"]]')*""\n"
+              " * The 'IsNull' type determine whether the value is NULL or not,\n"
+              "   which can be represented as '[operator, column_name]'\n"
+              R"*(   e.g. '["NULL", "col1"]', or '["NOTNULL", "col2"]')*""\n"
+              "Predicates can be combined together with predicate operators using the syntax\n"
+              "   [operator, predicate, predicate, ..., predicate].\n"
+              "For example,\n"
+              R"*(   ["AND", [">=", "col1", "value"], ["NOTNULL", "col2"]])*""\n"
+              "The only supported predicate operator is `AND`.");
+DEFINE_bool(show_value, false,
+            "Whether to show values of scanned rows.");
+DECLARE_string(tablets);
+
+namespace kudu {
+namespace tools {
+
+PredicateType ParsePredicateType(const string& predicate_type) {
+  string predicate_type_uc;
+  ToUpperCase(predicate_type, &predicate_type_uc);
+  if (predicate_type_uc == "=") {
+    return PredicateType::Equality;
+  } else if (predicate_type_uc == "<" ||
+      predicate_type_uc == "<=" ||
+      predicate_type_uc == ">" ||
+      predicate_type_uc == ">=") {
+    return PredicateType::Range;
+  } else if (predicate_type_uc == "NULL") {
+    return PredicateType::IsNull;
+  } else if (predicate_type_uc == "NOTNULL") {
+    return PredicateType::IsNotNull;
+  } else if (predicate_type_uc == "IN") {
+    return PredicateType::InList;
+  } else {
+    LOG(FATAL) << Substitute("unhandled predicate type $0", predicate_type);
+    return PredicateType::None;
+  }
+}
+
+KuduValue* ParseValue(KuduColumnSchema::DataType type,
+                      const rapidjson::Value* value) {
+  CHECK(value != nullptr);
+  switch (type) {
+    case KuduColumnSchema::DataType::INT8:
+    case KuduColumnSchema::DataType::INT16:
+    case KuduColumnSchema::DataType::INT32:
+      CHECK(value->IsInt());
+      return KuduValue::FromInt(value->GetInt());
+    case KuduColumnSchema::DataType::INT64:
+      CHECK(value->IsInt64());
+      return KuduValue::FromInt(value->GetInt64());
+    case KuduColumnSchema::DataType::STRING:
+      CHECK(value->IsString());
+      return KuduValue::CopyString(value->GetString());
+    case KuduColumnSchema::DataType::BOOL:
+      CHECK(value->IsBool());
+      return KuduValue::FromBool(value->GetBool());
+    case KuduColumnSchema::DataType::FLOAT:
+      CHECK(value->IsDouble());
+      return KuduValue::FromFloat(static_cast<float>(value->GetDouble()));
+    case KuduColumnSchema::DataType::DOUBLE:
+      CHECK(value->IsDouble());
+      return KuduValue::FromDouble(value->GetDouble());
+    default:
+      LOG(FATAL) << Substitute("unhandled data type $0", type);
+  }
+
+  return nullptr;
+}
+
+KuduPredicate* NewComparisonPredicate(const client::sp::shared_ptr<KuduTable>&
table,
+                                      KuduColumnSchema::DataType type,
+                                      const string& predicate_type,
+                                      const string& column_name,
+                                      const rapidjson::Value* value) {
+  KuduValue* kudu_value = ParseValue(type, value);
+  CHECK(kudu_value != nullptr);
+  client::KuduPredicate::ComparisonOp cop;
+  if (predicate_type == "<") {
+    cop = client::KuduPredicate::ComparisonOp::LESS;
+  } else if (predicate_type == "<=") {
+    cop = client::KuduPredicate::ComparisonOp::LESS_EQUAL;
+  } else if (predicate_type == "=") {
+    cop = client::KuduPredicate::ComparisonOp::EQUAL;
+  } else if (predicate_type == ">") {
+    cop = client::KuduPredicate::ComparisonOp::GREATER;
+  } else if (predicate_type == ">=") {
+    cop = client::KuduPredicate::ComparisonOp::GREATER_EQUAL;
+  } else {
+    return nullptr;
+  }
+  return table->NewComparisonPredicate(column_name, cop, kudu_value);
+}
+
+KuduPredicate* NewIsNullPredicate(const client::sp::shared_ptr<KuduTable>& table,
+                                  const string& column_name,
+                                  PredicateType pt) {
+  switch (pt) {
+    case PredicateType::IsNotNull:
+      return table->NewIsNotNullPredicate(column_name);
+    case PredicateType::IsNull:
+      return table->NewIsNullPredicate(column_name);
+    default:
+      return nullptr;
+  }
+}
+
+KuduPredicate* NewInListPredicate(const client::sp::shared_ptr<KuduTable> &table,
+                                  KuduColumnSchema::DataType type,
+                                  const string &name,
+                                  const JsonReader &reader,
+                                  const rapidjson::Value *object) {
+  CHECK(object->IsArray());
+  vector<const rapidjson::Value*> values;
+  reader.ExtractObjectArray(object, nullptr, &values);
+  vector<KuduValue *> kudu_values;
+  for (const auto& value : values) {
+    kudu_values.emplace_back(ParseValue(type, value));
+  }
+  return table->NewInListPredicate(name, &kudu_values);
+}
+
+Status AddPredicate(const client::sp::shared_ptr<KuduTable>& table,
+                    const string& predicate_type,
+                    const string& column_name,
+                    const boost::optional<const rapidjson::Value*>& value,
+                    const JsonReader& reader,
+                    KuduScanTokenBuilder& builder) {
+  if (predicate_type.empty() || column_name.empty()) {
+    return Status::OK();
+  }
+
+  Schema schema_internal = KuduSchema::ToSchema(table->schema());
+  int idx = schema_internal.find_column(column_name);
+  if (PREDICT_FALSE(idx == Schema::kColumnNotFound)) {
+    return Status::NotFound("no such column", column_name);
+  }
+  auto type = table->schema().Column(static_cast<size_t>(idx)).type();
+  KuduPredicate* predicate = nullptr;
+  PredicateType pt = ParsePredicateType(predicate_type);
+  switch (pt) {
+    case PredicateType::Equality:
+    case PredicateType::Range:
+      CHECK(value);
+      predicate = NewComparisonPredicate(table, type, predicate_type, column_name, value.get());
+      break;
+    case PredicateType::IsNotNull:
+    case PredicateType::IsNull:
+      CHECK(!value);
+      predicate = NewIsNullPredicate(table, column_name, pt);
+      break;
+    case PredicateType::InList: {
+      CHECK(value);
+      predicate = NewInListPredicate(table, type, column_name, reader, value.get());
+      break;
+    }
+    default:
+      return Status::NotSupported(Substitute("not support predicate_type $0", predicate_type));
+  }
+  CHECK(predicate);
+  RETURN_NOT_OK(builder.AddConjunctPredicate(predicate));
+
+  return Status::OK();
+}
+
+Status AddPredicates(const client::sp::shared_ptr<KuduTable>& table,
+                     KuduScanTokenBuilder& builder) {
+  if (FLAGS_predicates.empty()) {
+    return Status::OK();
+  }
+  JsonReader reader(FLAGS_predicates);
+  RETURN_NOT_OK(reader.Init());
+  vector<const rapidjson::Value*> predicate_objects;
+  RETURN_NOT_OK(reader.ExtractObjectArray(reader.root(),
+                                          nullptr,
+                                          &predicate_objects));
+  vector<unique_ptr<KuduPredicate>> predicates;
+  for (int i = 0; i < predicate_objects.size(); ++i) {
+    if (i == 0) {
+      CHECK(predicate_objects[i]->IsString());
+      string op;
+      ToUpperCase(predicate_objects[i]->GetString(), &op);
+      if (op != "AND") {
+        return Status::InvalidArgument(Substitute("only 'AND' operator is supported now"));
+      }
+      continue;
+    }
+
+    CHECK(predicate_objects[i]->IsArray());
+    vector<const rapidjson::Value*> elements;
+    reader.ExtractObjectArray(predicate_objects[i], nullptr, &elements);
+    if (elements.size() == 2 || elements.size() == 3) {
+      CHECK(elements[0]->IsString());
+      CHECK(elements[1]->IsString());
+      RETURN_NOT_OK(AddPredicate(table,
+          elements[0]->GetString(),
+          elements[1]->GetString(),
+          elements.size() == 2 ?
+            boost::none : boost::optional<const rapidjson::Value*>(elements[2]),
+          reader,
+          builder));
+    } else {
+      return Status::InvalidArgument(
+          Substitute("invalid predicate elements count $0", elements.size()));
+    }
+  }
+
+  return Status::OK();
+}
+
+void TableScanner::ScannerTask(const vector<KuduScanToken *>& tokens) {
+  for (auto token : tokens) {
+    Stopwatch sw(Stopwatch::THIS_THREAD);
+    sw.start();
+
+    KuduScanner* scanner;
+    CHECK_OK(token->IntoKuduScanner(&scanner));
+    CHECK_OK(scanner->Open());
+
+    uint64_t count = 0;
+    while (scanner->HasMoreRows()) {
+      KuduScanBatch batch;
+      CHECK_OK(scanner->NextBatch(&batch));
+      count += batch.NumRows();
+      total_count_.IncrementBy(batch.NumRows());
+      if (FLAGS_show_value) {
+        for (const auto& row : batch) {
+          cout << row.ToString() << endl;
+        }
+      }
+    }
+    delete scanner;
+
+    sw.stop();
+    cout << "T " << token->tablet().id() << " scanned count " <<
count
+        << " cost " << sw.elapsed().wall_seconds() << " seconds" <<
endl;
+  }
+}
+
+void TableScanner::MonitorTask() {
+  MonoTime last_log_time = MonoTime::Now();
+  while (thread_pool_->num_threads() > 1) {    // Some other table scan thread is running.
+    if (MonoTime::Now() - last_log_time >= MonoDelta::FromSeconds(5)) {
+      LOG(INFO) << "Scanned count: " << total_count_.Load() << endl;
+      last_log_time = MonoTime::Now();
+    }
+    SleepFor(MonoDelta::FromMilliseconds(100));
+  }
+}
+
+Status TableScanner::Run() {
+ client::sp::shared_ptr<KuduTable> table;
+  RETURN_NOT_OK(client_->OpenTable(table_name_, &table));
+
+  KuduScanTokenBuilder builder(table.get());
+  RETURN_NOT_OK(builder.SetCacheBlocks(FLAGS_fill_cache));
+  RETURN_NOT_OK(builder.SetSelection(KuduClient::LEADER_ONLY));
+  RETURN_NOT_OK(builder.SetReadMode(KuduScanner::READ_LATEST));
+  RETURN_NOT_OK(builder.SetTimeoutMillis(30000));
+
+  vector<string> projected_column_names = Split(FLAGS_columns, ",", strings::SkipEmpty());
+  RETURN_NOT_OK(builder.SetProjectedColumnNames(projected_column_names));
+  RETURN_NOT_OK(AddPredicates(table, builder));
+
+  vector<KuduScanToken*> tokens;
+  ElementDeleter deleter(&tokens);
+  RETURN_NOT_OK(builder.Build(&tokens));
+
+  const set<string>& tablet_id_filters = Split(FLAGS_tablets, ",", strings::SkipWhitespace());
+  map<int, vector<KuduScanToken*>> thread_tokens;
+  int i = 0;
+  for (auto token : tokens) {
+    if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters, token->tablet().id()))
{
+      thread_tokens[i++ % FLAGS_num_threads].push_back(token);
+    }
+  }
+
+  RETURN_NOT_OK(ThreadPoolBuilder("table_scan_pool")
+                  .set_max_threads(FLAGS_num_threads + 1)  // add extra 1 thread for MonitorTask
+                  .set_idle_timeout(MonoDelta::FromMilliseconds(1))
+                  .Build(&thread_pool_));
+
+  Stopwatch sw(Stopwatch::THIS_THREAD);
+  sw.start();
+  for (i = 0; i < FLAGS_num_threads; ++i) {
+    RETURN_NOT_OK(thread_pool_->SubmitFunc(
+        boost::bind(&TableScanner::ScannerTask, this, thread_tokens[i])));
+  }
+  RETURN_NOT_OK(thread_pool_->SubmitFunc(boost::bind(&TableScanner::MonitorTask, this)));
+  thread_pool_->Wait();
+  thread_pool_->Shutdown();
+
+  sw.stop();
+  cout << "Total count " << total_count_.Load()
+      << " cost " << sw.elapsed().wall_seconds() << " seconds" <<
endl;
+
+  return Status::OK();
+}
+
+} // namespace tools
+} // namespace kudu
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
new file mode 100644
index 0000000..c9589c5
--- /dev/null
+++ b/src/kudu/tools/table_scanner.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
+
+namespace kudu {
+namespace client {
+class KuduClient;
+class KuduScanToken;
+}  // namespace client
+}  // namespace kudu
+
+using kudu::client::KuduClient;
+using kudu::client::KuduScanToken;
+using std::string;
+using std::vector;
+
+namespace kudu {
+namespace tools {
+class TableScanner {
+public:
+  TableScanner(client::sp::shared_ptr<KuduClient> client, string table_name):
+    total_count_(0),
+    client_(std::move(client)),
+    table_name_(std::move(table_name)) {
+  }
+
+  Status Run();
+
+private:
+  void ScannerTask(const vector<KuduScanToken *>& tokens);
+  void MonitorTask();
+
+private:
+  AtomicInt<uint64_t> total_count_;
+  client::sp::shared_ptr<KuduClient> client_;
+  std::string table_name_;
+  gscoped_ptr<ThreadPool> thread_pool_;
+};
+} // namespace tools
+} // namespace kudu
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index 6a369f8..59d5f00 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -64,9 +64,7 @@ using strings::Substitute;
 } while (0);
 
 DECLARE_string(tables);
-DEFINE_string(tablets, "",
-              "Tablets to check (comma-separated list of IDs) "
-              "If not specified, checks all tablets.");
+DECLARE_string(tablets);
 
 DEFINE_string(sections, "*",
               "Sections to print (comma-separated list of sections, "
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index 472cb30..24ab535 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -96,6 +96,9 @@ DEFINE_string(print_entries, "decoded",
               "  id = print only their ids");
 DEFINE_string(table_name, "",
               "Restrict output to a specific table by name");
+DEFINE_string(tablets, "",
+              "Tablets to check (comma-separated list of IDs) "
+              "If not specified, checks all tablets.");
 DEFINE_int64(timeout_ms, 1000 * 60, "RPC timeout in milliseconds");
 DEFINE_int32(truncate_data, 100,
              "Truncate the data fields to the given number of bytes "
@@ -116,6 +119,9 @@ DEFINE_string(tables, "", "Tables to include (comma-separated list of
table name
 DEFINE_string(memtracker_output, "table",
               "One of 'json', 'json_compact' or 'table'. Table output flattens "
               "the memtracker hierarchy.");
+DEFINE_int32(num_threads, 2,
+             "Number of threads to run. Each thread runs its own "
+             "KuduSession.");
 
 namespace boost {
 template <typename Signature>
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index e86e5c0..fdd3d94 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -264,9 +264,7 @@ DEFINE_uint64(num_rows_per_thread, 1000,
               "Number of rows each thread generates and inserts; "
               "0 means unlimited. All rows generated by a thread are inserted "
               "in the context of the same session.");
-DEFINE_int32(num_threads, 2,
-             "Number of generator threads to run. Each thread runs its own "
-             "KuduSession.");
+DECLARE_int32(num_threads);
 DEFINE_bool(run_scan, false,
             "Whether to run post-insertion scan to verify that the count of "
             "the inserted rows matches the expected number. If enabled, "
diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc
index 0519c88..a0c3f06 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -41,46 +41,47 @@
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/tools/table_scanner.h"
 #include "kudu/tools/tool_action.h"
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/util/jsonreader.h"
 #include "kudu/util/status.h"
 
-DECLARE_string(tables);
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduPredicate;
+using kudu::client::KuduScanToken;
+using kudu::client::KuduScanTokenBuilder;
+using kudu::client::KuduScanner;
+using kudu::client::KuduSchema;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableAlterer;
+using kudu::client::internal::ReplicaController;
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Split;
+using strings::Substitute;
+
 DEFINE_bool(check_row_existence, false,
             "Also check for the existence of the row on the leader replica of "
             "the tablet. If found, the full row will be printed; if not found, "
             "an error message will be printed and the command will return a "
             "non-zero status.");
+DEFINE_bool(list_tablets, false,
+            "Include tablet and replica UUIDs in the output");
 DEFINE_bool(modify_external_catalogs, true,
             "Whether to modify external catalogs, such as the Hive Metastore, "
             "when renaming or dropping a table.");
-DEFINE_bool(list_tablets, false,
-            "Include tablet and replica UUIDs in the output");
+DECLARE_string(tables);
 
 namespace kudu {
 namespace tools {
 
-using client::KuduClient;
-using client::KuduClientBuilder;
-using client::KuduColumnSchema;
-using client::KuduPredicate;
-using client::KuduScanner;
-using client::KuduScanToken;
-using client::KuduScanTokenBuilder;
-using client::KuduSchema;
-using client::KuduTable;
-using client::KuduTableAlterer;
-using client::internal::ReplicaController;
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-using strings::Split;
-using strings::Substitute;
-
 // This class only exists so that ListTables() can easily be friended by
 // KuduReplica, KuduReplica::Data, and KuduClientBuilder.
 class TableLister {
@@ -391,6 +392,16 @@ Status ListTables(const RunnerContext& context) {
   return TableLister::ListTablets(Split(master_addresses_str, ","));
 }
 
+Status ScanTable(const RunnerContext &context) {
+  client::sp::shared_ptr<KuduClient> client;
+  RETURN_NOT_OK(CreateKuduClient(context, &client));
+
+  const string& table_name = FindOrDie(context.required_args, kTableNameArg);
+
+  TableScanner scanner(client, table_name);
+  return scanner.Run();
+}
+
 } // anonymous namespace
 
 unique_ptr<Mode> BuildTableMode() {
@@ -452,6 +463,21 @@ unique_ptr<Mode> BuildTableMode() {
       .AddOptionalParameter("modify_external_catalogs")
       .Build();
 
+  unique_ptr<Action> scan_table =
+      ActionBuilder("scan", &ScanTable)
+      .Description("Scan rows from a table")
+      .ExtraDescription("Scan rows from an existing table. See the help "
+                        "for the --predicates flag on how predicates can be specified.")
+      .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
+      .AddRequiredParameter({ kTableNameArg, "Name of the table to scan"})
+      .AddOptionalParameter("columns")
+      .AddOptionalParameter("fill_cache")
+      .AddOptionalParameter("num_threads")
+      .AddOptionalParameter("predicates")
+      .AddOptionalParameter("show_value")
+      .AddOptionalParameter("tablets")
+      .Build();
+
   return ModeBuilder("table")
       .Description("Operate on Kudu tables")
       .AddAction(std::move(delete_table))
@@ -460,6 +486,7 @@ unique_ptr<Mode> BuildTableMode() {
       .AddAction(std::move(locate_row))
       .AddAction(std::move(rename_column))
       .AddAction(std::move(rename_table))
+      .AddAction(std::move(scan_table))
       .Build();
 }
 


Mime
View raw message