kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [1/3] kudu git commit: [tools] ksck checksums: Add KsckChecksummer class
Date Wed, 03 Oct 2018 21:40:54 GMT
Repository: kudu
Updated Branches:
  refs/heads/master ec654c49f -> da28d0aee


[tools] ksck checksums: Add KsckChecksummer class

This removes the remaining checksum logic out of Ksck and into a new
KsckChecksummer class, along with refactoring some parts of the logic
into separate functions. There are no functional changes.

The logic that was not refactored is directly related to KUDU-2179, and
will be addressed in a follow-up patch.

Change-Id: I2016936eaa26fd6b499783e7d5d8f404816b37fa
Reviewed-on: http://gerrit.cloudera.org:8080/11498
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
Reviewed-by: Andrew Wong <awong@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/63e4a0a7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/63e4a0a7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/63e4a0a7

Branch: refs/heads/master
Commit: 63e4a0a700eeb2677f232bc472b36769be3e0382
Parents: ec654c4
Author: Will Berkeley <wdberkeley@gmail.org>
Authored: Fri Sep 21 16:45:35 2018 -0700
Committer: Will Berkeley <wdberkeley@gmail.com>
Committed: Wed Oct 3 18:04:36 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/ksck.cc          | 191 ++-----------------------
 src/kudu/tools/ksck.h           |   6 +-
 src/kudu/tools/ksck_checksum.cc | 263 ++++++++++++++++++++++++++++++++++-
 src/kudu/tools/ksck_checksum.h  |  65 +++++++++
 4 files changed, 337 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/63e4a0a7/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index a5ed001..4597668 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -35,7 +35,6 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
-#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/tablet.pb.h"
@@ -43,9 +42,7 @@
 #include "kudu/tools/ksck_checksum.h"
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/util/atomic.h"
-#include "kudu/util/blocking_queue.h"
 #include "kudu/util/locks.h"
-#include "kudu/util/monotime.h"
 #include "kudu/util/threadpool.h"
 
 #define PUSH_PREPEND_NOT_OK(s, statuses, msg) do { \
@@ -421,8 +418,13 @@ Status Ksck::Run() {
                       "table consistency check error");
 
   if (FLAGS_checksum_scan) {
-    PUSH_PREPEND_NOT_OK(ChecksumData(KsckChecksumOptions()),
-                        results_.error_messages, "checksum scan error");
+    // Copy the filters because they are passed by-value.
+    auto table_filters_for_checksum_opts = table_filters_;
+    auto tablet_id_filters_for_checksum_opts = tablet_id_filters_;
+    PUSH_PREPEND_NOT_OK(
+        ChecksumData(KsckChecksumOptions(std::move(table_filters_for_checksum_opts),
+                                         std::move(tablet_id_filters_for_checksum_opts))),
+        results_.error_messages, "checksum scan error");
   }
 
   // Use a special-case error if there are auth errors. This makes it harder
@@ -531,180 +533,13 @@ Status Ksck::CheckTablesConsistency() {
 }
 
 Status Ksck::ChecksumData(const KsckChecksumOptions& opts) {
-  // Copy options so that local modifications can be made and passed on.
-  KsckChecksumOptions options = opts;
-
-  typedef unordered_map<shared_ptr<KsckTablet>, shared_ptr<KsckTable>>
TabletTableMap;
-  TabletTableMap tablet_table_map;
-
-  int num_tables = 0;
-  int num_tablets = 0;
-  int num_tablet_replicas = 0;
-  for (const shared_ptr<KsckTable>& table : cluster_->tables()) {
-    VLOG(1) << "Table: " << table->name();
-    if (!MatchesAnyPattern(table_filters_, table->name())) continue;
-    num_tables += 1;
-    num_tablets += table->tablets().size();
-    for (const shared_ptr<KsckTablet>& tablet : table->tablets()) {
-      VLOG(1) << "Tablet: " << tablet->id();
-      if (!MatchesAnyPattern(tablet_id_filters_, tablet->id())) continue;
-      InsertOrDie(&tablet_table_map, tablet, table);
-      num_tablet_replicas += tablet->replicas().size();
-    }
-  }
-
-  if (num_tables == 0) {
-    string msg = "No table found.";
-    if (!table_filters_.empty()) {
-      msg += " Filter: table_filters=" + JoinStrings(table_filters_, ",");
-    }
-    return Status::NotFound(msg);
-  }
-
-  if (num_tablets > 0 && num_tablet_replicas == 0) {
-    // Warn if the table has tablets, but no replicas. The table may have no
-    // tablets if all range partitions have been dropped.
-    string msg = "No tablet replicas found.";
-    if (!table_filters_.empty() || !tablet_id_filters_.empty()) {
-      msg += " Filter: ";
-      if (!table_filters_.empty()) {
-        msg += "table_filters=" + JoinStrings(table_filters_, ",");
-      }
-      if (!tablet_id_filters_.empty()) {
-        msg += "tablet_id_filters=" + JoinStrings(tablet_id_filters_, ",");
-      }
-    }
-    return Status::NotFound(msg);
-  }
-
-  // Map of tablet servers to tablet queue.
-  typedef unordered_map<shared_ptr<KsckTabletServer>, SharedTabletQueue> TabletServerQueueMap;
-
-  TabletServerQueueMap tablet_server_queues;
-  scoped_refptr<ChecksumResultReporter> reporter(new ChecksumResultReporter(num_tablet_replicas));
-
-  // Create a queue of checksum callbacks grouped by the tablet server.
-  for (const TabletTableMap::value_type& entry : tablet_table_map) {
-    const shared_ptr<KsckTablet>& tablet = entry.first;
-    const shared_ptr<KsckTable>& table = entry.second;
-    for (const shared_ptr<KsckTabletReplica>& replica : tablet->replicas())
{
-      const shared_ptr<KsckTabletServer>& ts =
-          FindOrDie(cluster_->tablet_servers(), replica->ts_uuid());
-
-      const SharedTabletQueue& queue =
-          LookupOrInsertNewSharedPtr(&tablet_server_queues, ts, num_tablet_replicas);
-      CHECK_EQ(QUEUE_SUCCESS, queue->Put(make_pair(table->schema(), tablet->id())));
-    }
-  }
-
-  if (options.use_snapshot &&
-      options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
-    // Set the snapshot timestamp to the current timestamp of the first healthy tablet server
-    // we can find.
-    for (const auto& ts : tablet_server_queues) {
-      if (ts.first->is_healthy()) {
-        options.snapshot_timestamp = ts.first->current_timestamp();
-        break;
-      }
-    }
-    if (options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
-      return Status::ServiceUnavailable(
-          "No tablet servers were available to fetch the current timestamp");
-    }
-    results_.checksum_results.snapshot_timestamp = options.snapshot_timestamp;
-  }
-
-  // Kick off checksum scans in parallel. For each tablet server, we start
-  // scan_concurrency scans. Each callback then initiates one additional
-  // scan when it returns if the queue for that TS is not empty.
-  for (const TabletServerQueueMap::value_type& entry : tablet_server_queues) {
-    const shared_ptr<KsckTabletServer>& tablet_server = entry.first;
-    const SharedTabletQueue& queue = entry.second;
-    queue->Shutdown(); // Ensures that BlockingGet() will not block.
-    for (int i = 0; i < options.scan_concurrency; i++) {
-      std::pair<Schema, std::string> table_tablet;
-      if (queue->BlockingGet(&table_tablet)) {
-        const Schema& table_schema = table_tablet.first;
-        const std::string& tablet_id = table_tablet.second;
-        auto* cbs = new TabletServerChecksumCallbacks(
-            reporter, tablet_server, queue, tablet_id, options);
-        // 'cbs' deletes itself when complete.
-        tablet_server->RunTabletChecksumScanAsync(tablet_id, table_schema, options, cbs);
-      }
-    }
-  }
-
-  // Don't ruin JSON output by printing progress updates.
+  KsckChecksummer checksummer(cluster_.get());
+  auto* checksum_results = &results_.checksum_results;
+  // Don't ruin JSON output with ad hoc progress updates.
   auto* out_for_progress_updates = IsNonJSONFormat() ? out_ : nullptr;
-  bool timed_out = !reporter->WaitFor(options.timeout, out_for_progress_updates);
-
-  // Even if we timed out, for printing collate the checksum results that we did get.
-  ChecksumResultReporter::TabletResultMap checksums = reporter->checksums();
-
-  int num_errors = 0;
-  int num_mismatches = 0;
-  int num_results = 0;
-  KsckTableChecksumMap checksum_tables;
-  for (const shared_ptr<KsckTable>& table : cluster_->tables()) {
-    KsckTableChecksum table_checksum;
-    for (const shared_ptr<KsckTablet>& tablet : table->tablets()) {
-      if (ContainsKey(checksums, tablet->id())) {
-        KsckTabletChecksum tablet_checksum;
-        tablet_checksum.tablet_id = tablet->id();
-        bool seen_first_replica = false;
-        uint64_t first_checksum = 0;
-
-        for (const auto& r : FindOrDie(checksums, tablet->id())) {
-          KsckReplicaChecksum replica_checksum;
-          const string& replica_uuid = r.first;
-          shared_ptr<KsckTabletServer> ts = FindOrDie(cluster_->tablet_servers(),
replica_uuid);
-          replica_checksum.ts_uuid = ts->uuid();
-          replica_checksum.ts_address = ts->address();
-
-          const ChecksumResultReporter::ResultPair& result = r.second;
-          const Status& status = result.first;
-          replica_checksum.checksum = result.second;
-          replica_checksum.status = status;
-          if (!status.ok()) {
-            num_errors++;
-          } else if (!seen_first_replica) {
-            seen_first_replica = true;
-            first_checksum = replica_checksum.checksum;
-          } else if (replica_checksum.checksum != first_checksum) {
-            num_mismatches++;
-            tablet_checksum.mismatch = true;
-          }
-          num_results++;
-          InsertOrDie(&tablet_checksum.replica_checksums,
-                      replica_checksum.ts_uuid,
-                      std::move(replica_checksum));
-        }
-        InsertOrDie(&table_checksum,
-                    tablet_checksum.tablet_id,
-                    std::move(tablet_checksum));
-      }
-    }
-    InsertOrDie(&checksum_tables, table->name(), std::move(table_checksum));
-  }
-  results_.checksum_results.tables.swap(checksum_tables);
-  if (timed_out) {
-    return Status::TimedOut(Substitute("Checksum scan did not complete within the timeout
of $0: "
-                                       "Received results for $1 out of $2 expected replicas",
-                                       options.timeout.ToString(), num_results,
-                                       num_tablet_replicas));
-  }
-  CHECK_EQ(num_results, num_tablet_replicas)
-      << Substitute("Unexpected error: only got $0 out of $1 replica results",
-                    num_results, num_tablet_replicas);
-
-  if (num_mismatches != 0) {
-    return Status::Corruption(Substitute("$0 checksum mismatches were detected.", num_mismatches));
-  }
-  if (num_errors != 0) {
-    return Status::Aborted(Substitute("$0 errors were detected", num_errors));
-  }
-
-  return Status::OK();
+  return checksummer.ChecksumData(opts,
+                                  checksum_results,
+                                  out_for_progress_updates);
 }
 
 bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/63e4a0a7/src/kudu/tools/ksck.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index 8e5e0b2..c654be0 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -412,15 +412,15 @@ class KsckCluster {
   // The table's tablet list is modified only if this method returns OK.
   virtual Status RetrieveTabletsList(const std::shared_ptr<KsckTable>& table) =
0;
 
-  const MasterList& masters() {
+  const MasterList& masters() const {
     return masters_;
   }
 
-  const TSMap& tablet_servers() {
+  const TSMap& tablet_servers() const {
     return tablet_servers_;
   }
 
-  const std::vector<std::shared_ptr<KsckTable>>& tables() {
+  const std::vector<std::shared_ptr<KsckTable>>& tables() const {
     return tables_;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/63e4a0a7/src/kudu/tools/ksck_checksum.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_checksum.cc b/src/kudu/tools/ksck_checksum.cc
index 9d2b289..70d7c47 100644
--- a/src/kudu/tools/ksck_checksum.cc
+++ b/src/kudu/tools/ksck_checksum.cc
@@ -20,22 +20,30 @@
 #include <algorithm>
 #include <cstdint>
 #include <iostream>
+#include <map>
 #include <string>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
+#include <glog/logging.h>
 
 #include "kudu/common/schema.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/human_readable.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tools/ksck.h"
+#include "kudu/tools/tool_action_common.h"
 
 using std::endl;
+using std::ostream;
 using std::shared_ptr;
 using std::string;
 using std::unordered_map;
+using std::vector;
 using strings::Substitute;
 
 DEFINE_int32(checksum_timeout_sec, 3600,
@@ -53,17 +61,40 @@ namespace kudu {
 namespace tools {
 
 KsckChecksumOptions::KsckChecksumOptions()
-    : timeout(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec)),
-      scan_concurrency(FLAGS_checksum_scan_concurrency),
-      use_snapshot(FLAGS_checksum_snapshot),
-      snapshot_timestamp(FLAGS_checksum_snapshot_timestamp) {}
+    : KsckChecksumOptions({}, {}) {}
 
-KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout, int scan_concurrency,
-                                         bool use_snapshot, uint64_t snapshot_timestamp)
+KsckChecksumOptions::KsckChecksumOptions(vector<string> table_filters,
+                                         vector<string> tablet_id_filters)
+    : KsckChecksumOptions(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec),
+                          FLAGS_checksum_scan_concurrency,
+                          FLAGS_checksum_snapshot,
+                          FLAGS_checksum_snapshot_timestamp,
+                          std::move(table_filters),
+                          std::move(tablet_id_filters)) {}
+
+KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout,
+                                         int scan_concurrency,
+                                         bool use_snapshot,
+                                         uint64_t snapshot_timestamp)
+    : KsckChecksumOptions(timeout,
+                          scan_concurrency,
+                          use_snapshot,
+                          snapshot_timestamp,
+                          {},
+                          {}) {}
+
+KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout,
+                                         int scan_concurrency,
+                                         bool use_snapshot,
+                                         uint64_t snapshot_timestamp,
+                                         vector<string> table_filters,
+                                         vector<string> tablet_id_filters)
     : timeout(timeout),
       scan_concurrency(scan_concurrency),
       use_snapshot(use_snapshot),
-      snapshot_timestamp(snapshot_timestamp) {}
+      snapshot_timestamp(snapshot_timestamp),
+      table_filters(std::move(table_filters)),
+      tablet_id_filters(std::move(tablet_id_filters)) {}
 
 ChecksumResultReporter::ChecksumResultReporter(int num_tablet_replicas)
     : expected_count_(num_tablet_replicas),
@@ -153,5 +184,223 @@ void TabletServerChecksumCallbacks::Finished(const Status& status,
uint64_t chec
   }
 }
 
+KsckChecksummer::KsckChecksummer(KsckCluster* cluster)
+    : cluster_(CHECK_NOTNULL(cluster)) {}
+
+Status KsckChecksummer::BuildTabletTableMap(
+    const KsckChecksumOptions& opts,
+    KsckChecksummer::TabletTableMap* tablet_table_map,
+    int* num_replicas) const {
+  CHECK(tablet_table_map);
+  CHECK(num_replicas);
+
+  TabletTableMap tablet_table_map_tmp;
+  int num_tables = 0;
+  int num_tablets = 0;
+  int num_replicas_tmp = 0;
+  for (const shared_ptr<KsckTable>& table : cluster_->tables()) {
+    VLOG(1) << "Table: " << table->name();
+    if (!MatchesAnyPattern(opts.table_filters, table->name())) continue;
+    num_tables += 1;
+    num_tablets += table->tablets().size();
+    for (const shared_ptr<KsckTablet>& tablet : table->tablets()) {
+      VLOG(1) << "Tablet: " << tablet->id();
+      if (!MatchesAnyPattern(opts.tablet_id_filters, tablet->id())) continue;
+      InsertOrDie(&tablet_table_map_tmp, tablet, table);
+      num_replicas_tmp += tablet->replicas().size();
+    }
+  }
+
+  if (num_tables == 0) {
+    string msg = "No table found.";
+    if (!opts.table_filters.empty()) {
+      msg += " Filter: table_filters=" + JoinStrings(opts.table_filters, ",");
+    }
+    return Status::NotFound(msg);
+  }
+
+  if (num_tablets > 0 && num_replicas_tmp == 0) {
+    // Warn if the table has tablets, but no replicas. The table may have no
+    // tablets if all range partitions have been dropped.
+    string msg = "No tablet replicas found.";
+    if (!opts.table_filters.empty() || !opts.tablet_id_filters.empty()) {
+      msg += " Filter:";
+      if (!opts.table_filters.empty()) {
+        msg += " table_filters=" + JoinStrings(opts.table_filters, ",");
+      }
+      if (!opts.tablet_id_filters.empty()) {
+        msg += " tablet_id_filters=" + JoinStrings(opts.tablet_id_filters, ",");
+      }
+    }
+    return Status::NotFound(msg);
+  }
+
+  *tablet_table_map = std::move(tablet_table_map_tmp);
+  *num_replicas = num_replicas_tmp;
+  return Status::OK();
+}
+
+Status KsckChecksummer::CollateChecksumResults(
+    const ChecksumResultReporter::TabletResultMap& checksums,
+    KsckTableChecksumMap* table_checksum_map,
+    int* num_results) const {
+  CHECK(table_checksum_map);
+  CHECK(num_results);
+
+  table_checksum_map->clear();
+  *num_results = 0;
+  int num_errors = 0;
+  int num_mismatches = 0;
+  for (const auto& table : cluster_->tables()) {
+    KsckTableChecksum table_checksum;
+    for (const auto& tablet : table->tablets()) {
+      if (ContainsKey(checksums, tablet->id())) {
+        KsckTabletChecksum tablet_checksum;
+        tablet_checksum.tablet_id = tablet->id();
+        bool seen_first_replica = false;
+        uint64_t first_checksum = 0;
+
+        for (const auto& r : FindOrDie(checksums, tablet->id())) {
+          KsckReplicaChecksum replica_checksum;
+          const auto& replica_uuid = r.first;
+          const auto& ts = FindOrDie(cluster_->tablet_servers(), replica_uuid);
+          replica_checksum.ts_uuid = ts->uuid();
+          replica_checksum.ts_address = ts->address();
+
+          const ChecksumResultReporter::ResultPair& result = r.second;
+          const Status& status = result.first;
+          replica_checksum.checksum = result.second;
+          replica_checksum.status = status;
+          if (!status.ok()) {
+            num_errors++;
+          } else if (!seen_first_replica) {
+            seen_first_replica = true;
+            first_checksum = replica_checksum.checksum;
+          } else if (replica_checksum.checksum != first_checksum) {
+            num_mismatches++;
+            tablet_checksum.mismatch = true;
+          }
+          (*num_results)++;
+          EmplaceOrDie(&tablet_checksum.replica_checksums,
+                       replica_checksum.ts_uuid,
+                       std::move(replica_checksum));
+        }
+        EmplaceOrDie(&table_checksum,
+                     tablet_checksum.tablet_id,
+                     std::move(tablet_checksum));
+      }
+    }
+    EmplaceOrDie(table_checksum_map, table->name(), std::move(table_checksum));
+  }
+
+  if (num_mismatches != 0) {
+    return Status::Corruption(Substitute("$0 checksum mismatches were detected.",
+                                         num_mismatches));
+  }
+  if (num_errors != 0) {
+    return Status::Aborted(Substitute("$0 errors were detected", num_errors));
+  }
+  return Status::OK();
+}
+
+Status KsckChecksummer::ChecksumData(const KsckChecksumOptions& opts,
+                                     KsckChecksumResults* checksum_results,
+                                     ostream* out_for_progress_updates) {
+  CHECK(checksum_results);
+
+  // Make a copy of 'opts' because we may need to assign a snapshot timestamp
+  // if one was not provided.
+  KsckChecksumOptions options = opts;
+
+  // Clear the contents of 'checksum_results' because we always overwrite it
+  // with whatever results are obtained (and with nothing if there's no results).
+  checksum_results->snapshot_timestamp = boost::none;
+  checksum_results->tables.clear();
+
+  TabletTableMap tablet_table_map;
+  int num_replicas;
+  RETURN_NOT_OK(BuildTabletTableMap(options, &tablet_table_map, &num_replicas));
+
+  // Map of tablet servers to tablet queue.
+  typedef unordered_map<shared_ptr<KsckTabletServer>, SharedTabletQueue> TabletServerQueueMap;
+
+  TabletServerQueueMap tablet_server_queues;
+  scoped_refptr<ChecksumResultReporter> reporter(
+      new ChecksumResultReporter(num_replicas));
+
+  // Create a queue of checksum callbacks grouped by the tablet server.
+  for (const auto& entry : tablet_table_map) {
+    const shared_ptr<KsckTablet>& tablet = entry.first;
+    const shared_ptr<KsckTable>& table = entry.second;
+    for (const shared_ptr<KsckTabletReplica>& replica : tablet->replicas())
{
+      const shared_ptr<KsckTabletServer>& ts =
+          FindOrDie(cluster_->tablet_servers(), replica->ts_uuid());
+
+      const SharedTabletQueue& queue =
+          LookupOrInsertNewSharedPtr(&tablet_server_queues, ts, num_replicas);
+      CHECK_EQ(QUEUE_SUCCESS, queue->Put(make_pair(table->schema(), tablet->id())));
+    }
+  }
+
+  // Set the snapshot timestamp. If the sentinel value 'kCurrentTimestamp' was
+  // provided, the snapshot timestamp is set to the current timestamp of some
+  // healthy tablet server.
+  if (options.use_snapshot &&
+      options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
+    for (const auto& ts : tablet_server_queues) {
+      if (ts.first->is_healthy()) {
+        options.snapshot_timestamp = ts.first->current_timestamp();
+        break;
+      }
+    }
+    if (options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
+      return Status::ServiceUnavailable(
+          "No tablet servers were available to fetch the current timestamp");
+    }
+    checksum_results->snapshot_timestamp = options.snapshot_timestamp;
+  }
+
+  // Kick off checksum scans in parallel. For each tablet server, we start
+  // 'options.scan_concurrency' scans. Each callback then initiates one
+  // additional scan when it returns if the queue for that TS is not empty.
+  for (const auto& entry : tablet_server_queues) {
+    const shared_ptr<KsckTabletServer>& tablet_server = entry.first;
+    const SharedTabletQueue& queue = entry.second;
+    queue->Shutdown(); // Ensures that BlockingGet() will not block.
+    for (int i = 0; i < options.scan_concurrency; i++) {
+      std::pair<Schema, std::string> table_tablet;
+      if (queue->BlockingGet(&table_tablet)) {
+        const Schema& table_schema = table_tablet.first;
+        const std::string& tablet_id = table_tablet.second;
+        auto* cbs = new TabletServerChecksumCallbacks(
+            reporter, tablet_server, queue, tablet_id, options);
+        // 'cbs' deletes itself when complete.
+        tablet_server->RunTabletChecksumScanAsync(tablet_id, table_schema, options, cbs);
+      }
+    }
+  }
+
+  bool timed_out = !reporter->WaitFor(options.timeout, out_for_progress_updates);
+
+  // Even if we timed out, collate the checksum results that we did get.
+  KsckTableChecksumMap checksum_table_map;
+  int num_results;
+  const Status s = CollateChecksumResults(reporter->checksums(),
+                                          &checksum_table_map,
+                                          &num_results);
+  checksum_results->tables = std::move(checksum_table_map);
+
+  if (timed_out) {
+    return Status::TimedOut(Substitute("Checksum scan did not complete within the timeout
of $0: "
+                                       "Received results for $1 out of $2 expected replicas",
+                                       options.timeout.ToString(), num_results,
+                                       num_replicas));
+  }
+  CHECK_EQ(num_results, num_replicas)
+      << Substitute("Unexpected error: only got $0 out of $1 replica results",
+                    num_results, num_replicas);
+  return s;
+}
+
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/63e4a0a7/src/kudu/tools/ksck_checksum.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_checksum.h b/src/kudu/tools/ksck_checksum.h
index 7285f92..8fbb2f1 100644
--- a/src/kudu/tools/ksck_checksum.h
+++ b/src/kudu/tools/ksck_checksum.h
@@ -24,8 +24,11 @@
 #include <string>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/tools/ksck_results.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/blocking_queue.h"
 #include "kudu/util/countdown_latch.h"
@@ -39,6 +42,9 @@ class Schema;
 
 namespace tools {
 
+class KsckCluster;
+class KsckTable;
+class KsckTablet;
 class KsckTabletServer;
 
 // Options for checksum scans.
@@ -49,11 +55,21 @@ struct KsckChecksumOptions {
 
   KsckChecksumOptions();
 
+  KsckChecksumOptions(std::vector<std::string> table_filters,
+                      std::vector<std::string> tablet_id_filters);
+
   KsckChecksumOptions(MonoDelta timeout,
                       int scan_concurrency,
                       bool use_snapshot,
                       uint64_t snapshot_timestamp);
 
+  KsckChecksumOptions(MonoDelta timeout,
+                      int scan_concurrency,
+                      bool use_snapshot,
+                      uint64_t snapshot_timestamp,
+                      std::vector<std::string> table_filters,
+                      std::vector<std::string> tablet_id_filters);
+
   // The maximum total time to wait for results to come back from all replicas.
   MonoDelta timeout;
 
@@ -65,6 +81,11 @@ struct KsckChecksumOptions {
 
   // The snapshot timestamp to use for snapshot checksum scans.
   uint64_t snapshot_timestamp;
+
+  // Filters for the table names and tablet ids whose contents should be
+  // checksummed.
+  std::vector<std::string> table_filters;
+  std::vector<std::string> tablet_id_filters;
 };
 
 // Interface for reporting progress on checksumming a single
@@ -152,6 +173,8 @@ class TabletServerChecksumCallbacks : public KsckChecksumProgressCallbacks
{
   void Finished(const Status& status, uint64_t checksum) override;
 
  private:
+  ~TabletServerChecksumCallbacks() = default;
+
   const scoped_refptr<ChecksumResultReporter> reporter_;
   const std::shared_ptr<KsckTabletServer> tablet_server_;
   const SharedTabletQueue queue_;
@@ -159,5 +182,47 @@ class TabletServerChecksumCallbacks : public KsckChecksumProgressCallbacks
{
 
   std::string tablet_id_;
 };
+
+// A class for performing checksum scans against a Kudu cluster.
+class KsckChecksummer {
+ public:
+   // 'cluster' must remain valid as long as this instance is alive.
+  explicit KsckChecksummer(KsckCluster* cluster);
+
+  // Checksum the data in the Kudu cluster according to the options provided in
+  // 'opts'. Results will be populated in the 'checksum_results'. If non-null,
+  // progress updates will be written to 'out_for_progress_updates'.
+  // NOTE: Even if this method returns a bad Status, 'checksum_results' will be
+  // populated with whatever checksum results were received.
+  Status ChecksumData(const KsckChecksumOptions& opts,
+                      KsckChecksumResults* checksum_results,
+                      std::ostream* out_for_progress_updates);
+
+ private:
+  typedef std::unordered_map<std::shared_ptr<KsckTablet>,
+                             std::shared_ptr<KsckTable>> TabletTableMap;
+
+  // Builds a mapping from tablet-to-be-checksummed to its table, which is
+  // used to create checksum callbacks. This mapping is returned in
+  // 'tablet_table_map' and the total number of replicas to be checksummed is
+  // returned in 'num_replicas'.
+  Status BuildTabletTableMap(const KsckChecksumOptions& opts,
+                             TabletTableMap* tablet_table_map,
+                             int* num_replicas) const;
+
+  // Collates the results of checksums into 'table_checksum_map', with the
+  // total number of results returned as 'num_results'.
+  // NOTE: Even if this function returns a bad Status, 'table_checksum_map'
+  // and 'num_results' will still be populated using whatever results are
+  // available.
+  Status CollateChecksumResults(
+      const ChecksumResultReporter::TabletResultMap& checksums,
+      KsckTableChecksumMap* table_checksum_map,
+      int* num_results) const;
+
+  KsckCluster* cluster_;
+
+  DISALLOW_COPY_AND_ASSIGN(KsckChecksummer);
+};
 } // namespace tools
 } // namespace kudu


Mime
View raw message