kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [2/3] kudu git commit: [tools] ksck checksums: Factor out of main ksck code
Date Tue, 02 Oct 2018 18:52:26 GMT
[tools] ksck checksums: Factor out of main ksck code

To prepare to address KUDU-2179 and to improve the organization of the
code, this patch factors checksum-related classes and code out of
ksck.{cc,h} into a new pair of files ksck_checksum.{cc,h}. Following
the pattern of other ksck-related classes, it also renames some classes
so they start with "Ksck".

There are no functional changes in this patch.

Change-Id: I4bb1f51af22ab0c6c20b9426dbb62ea48413ed5b
Reviewed-on: http://gerrit.cloudera.org:8080/11488
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7f5f57f5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7f5f57f5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7f5f57f5

Branch: refs/heads/master
Commit: 7f5f57f5d73c3b583478210d9b704d7b3793ffe6
Parents: 9f9070a
Author: Will Berkeley <wdberkeley@gmail.org>
Authored: Fri Sep 7 16:05:55 2018 -0700
Committer: Will Berkeley <wdberkeley@gmail.com>
Committed: Tue Oct 2 14:38:54 2018 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/cluster_verifier.cc |   2 +-
 src/kudu/integration-tests/cluster_verifier.h  |   8 +-
 src/kudu/tools/CMakeLists.txt                  |   1 +
 src/kudu/tools/ksck-test.cc                    |   5 +-
 src/kudu/tools/ksck.cc                         | 182 ++------------------
 src/kudu/tools/ksck.h                          |  54 +-----
 src/kudu/tools/ksck_checksum.cc                | 157 +++++++++++++++++
 src/kudu/tools/ksck_checksum.h                 | 163 ++++++++++++++++++
 src/kudu/tools/ksck_remote-test.cc             |  11 +-
 src/kudu/tools/ksck_remote.cc                  |  11 +-
 src/kudu/tools/ksck_remote.h                   |   6 +-
 11 files changed, 363 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/integration-tests/cluster_verifier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_verifier.cc b/src/kudu/integration-tests/cluster_verifier.cc
index 8f54554..6a19581 100644
--- a/src/kudu/integration-tests/cluster_verifier.cc
+++ b/src/kudu/integration-tests/cluster_verifier.cc
@@ -31,6 +31,7 @@
 #include "kudu/integration-tests/log_verifier.h"
 #include "kudu/mini-cluster/mini_cluster.h"
 #include "kudu/tools/ksck.h"
+#include "kudu/tools/ksck_checksum.h"
 #include "kudu/tools/ksck_remote.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
@@ -51,7 +52,6 @@ using tools::RemoteKsckCluster;
 
 ClusterVerifier::ClusterVerifier(MiniCluster* cluster)
     : cluster_(cluster),
-      checksum_options_(tools::ChecksumOptions()),
       operations_timeout_(MonoDelta::FromSeconds(60)) {
   checksum_options_.use_snapshot = false;
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/integration-tests/cluster_verifier.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_verifier.h b/src/kudu/integration-tests/cluster_verifier.h
index 533a707..f95e742 100644
--- a/src/kudu/integration-tests/cluster_verifier.h
+++ b/src/kudu/integration-tests/cluster_verifier.h
@@ -19,9 +19,9 @@
 #include <string>
 
 #include "kudu/gutil/macros.h"
-#include "kudu/tools/ksck.h"
-#include "kudu/util/status.h"
+#include "kudu/tools/ksck_checksum.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
 
 namespace kudu {
 
@@ -87,9 +87,9 @@ class ClusterVerifier {
                          int expected_row_count);
 
 
-  cluster::MiniCluster* cluster_;
+  tools::KsckChecksumOptions checksum_options_;
 
-  tools::ChecksumOptions checksum_options_;
+  cluster::MiniCluster* cluster_;
 
   MonoDelta operations_timeout_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index 97310f9..c9c7db3 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -66,6 +66,7 @@ target_link_libraries(kudu_tools_util
 
 add_library(ksck
     ksck.cc
+    ksck_checksum.cc
     ksck_remote.cc
     ksck_results.cc
 )

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index 273be4b..a07554e 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -43,6 +43,7 @@
 #include "kudu/server/server_base.pb.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet.pb.h"
+#include "kudu/tools/ksck_checksum.h"
 #include "kudu/tools/ksck_results.h"
 #include "kudu/util/jsonreader.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -144,8 +145,8 @@ class MockKsckTabletServer : public KsckTabletServer {
   virtual void RunTabletChecksumScanAsync(
       const std::string& /*tablet_id*/,
       const Schema& /*schema*/,
-      const ChecksumOptions& /*options*/,
-      ChecksumProgressCallbacks* callbacks) OVERRIDE {
+      const KsckChecksumOptions& /*options*/,
+      KsckChecksumProgressCallbacks* callbacks) OVERRIDE {
     callbacks->Progress(10, 20);
     callbacks->Finished(Status::OK(), 0);
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index c426d38..a5ed001 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -36,15 +36,14 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/tablet.pb.h"
 #include "kudu/tools/color.h"
+#include "kudu/tools/ksck_checksum.h"
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/blocking_queue.h"
-#include "kudu/util/countdown_latch.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/threadpool.h"
@@ -58,17 +57,6 @@
 
 DEFINE_bool(checksum_scan, false,
             "Perform a checksum scan on data in the cluster.");
-DEFINE_int32(checksum_timeout_sec, 3600,
-             "Maximum total seconds to wait for a checksum scan to complete "
-             "before timing out.");
-DEFINE_int32(checksum_scan_concurrency, 4,
-             "Number of concurrent checksum scans to execute per tablet server.");
-DEFINE_bool(checksum_snapshot, true, "Should the checksum scanner use a snapshot scan");
-DEFINE_uint64(checksum_snapshot_timestamp,
-              kudu::tools::ChecksumOptions::kCurrentTimestamp,
-              "timestamp to use for snapshot checksum scans, defaults to 0, which "
-              "uses the current timestamp of a tablet server involved in the scan");
-
 DEFINE_int32(fetch_replica_info_concurrency, 20,
              "Number of concurrent tablet servers to fetch replica info from.");
 
@@ -140,22 +128,6 @@ bool IsNonJSONFormat() {
 
 } // anonymous namespace
 
-ChecksumOptions::ChecksumOptions()
-    : timeout(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec)),
-      scan_concurrency(FLAGS_checksum_scan_concurrency),
-      use_snapshot(FLAGS_checksum_snapshot),
-      snapshot_timestamp(FLAGS_checksum_snapshot_timestamp) {
-}
-
-ChecksumOptions::ChecksumOptions(MonoDelta timeout, int scan_concurrency,
-                                 bool use_snapshot, uint64_t snapshot_timestamp)
-    : timeout(timeout),
-      scan_concurrency(scan_concurrency),
-      use_snapshot(use_snapshot),
-      snapshot_timestamp(snapshot_timestamp) {}
-
-const uint64_t ChecksumOptions::kCurrentTimestamp = 0;
-
 tablet::TabletStatePB KsckTabletServer::ReplicaState(const std::string& tablet_id) const
{
   CHECK_EQ(state_, KsckFetchState::FETCHED);
   if (!ContainsKey(tablet_status_map_, tablet_id)) {
@@ -449,7 +421,7 @@ Status Ksck::Run() {
                       "table consistency check error");
 
   if (FLAGS_checksum_scan) {
-    PUSH_PREPEND_NOT_OK(ChecksumData(ChecksumOptions()),
+    PUSH_PREPEND_NOT_OK(ChecksumData(KsckChecksumOptions()),
                         results_.error_messages, "checksum scan error");
   }
 
@@ -558,146 +530,9 @@ Status Ksck::CheckTablesConsistency() {
   return Status::OK();
 }
 
-// Class to act as a collector of scan results.
-// Provides thread-safe accessors to update and read a hash table of results.
-class ChecksumResultReporter : public RefCountedThreadSafe<ChecksumResultReporter>
{
- public:
-  typedef std::pair<Status, uint64_t> ResultPair;
-  typedef std::unordered_map<std::string, ResultPair> ReplicaResultMap;
-  typedef std::unordered_map<std::string, ReplicaResultMap> TabletResultMap;
-
-  // Initialize reporter with the number of replicas being queried.
-  explicit ChecksumResultReporter(int num_tablet_replicas)
-      : expected_count_(num_tablet_replicas),
-        responses_(num_tablet_replicas),
-        rows_summed_(0),
-        disk_bytes_summed_(0) {
-  }
-
-  void ReportProgress(int64_t delta_rows, int64_t delta_bytes) {
-    rows_summed_.IncrementBy(delta_rows);
-    disk_bytes_summed_.IncrementBy(delta_bytes);
-  }
-
-  // Write an entry to the result map indicating a response from the remote.
-  void ReportResult(const std::string& tablet_id,
-                    const std::string& replica_uuid,
-                    const Status& status,
-                    uint64_t checksum) {
-    std::lock_guard<simple_spinlock> guard(lock_);
-    unordered_map<string, ResultPair>& replica_results =
-        LookupOrInsert(&checksums_, tablet_id, unordered_map<string, ResultPair>());
-    InsertOrDie(&replica_results, replica_uuid, ResultPair(status, checksum));
-    responses_.CountDown();
-  }
-
-  // Blocks until either the number of results plus errors reported equals
-  // num_tablet_replicas (from the constructor), or until the timeout expires,
-  // whichever comes first. Progress messages are printed to 'out'.
-  // Returns false if the timeout expired before all responses came in.
-  // Otherwise, returns true.
-  bool WaitFor(const MonoDelta& timeout, std::ostream* out) const {
-    MonoTime start = MonoTime::Now();
-    MonoTime deadline = start + timeout;
-
-    bool done = false;
-    while (!done) {
-      MonoTime now = MonoTime::Now();
-      int rem_ms = (deadline - now).ToMilliseconds();
-      if (rem_ms <= 0) return false;
-
-      done = responses_.WaitFor(MonoDelta::FromMilliseconds(std::min(rem_ms, 5000)));
-      string status = done ? "finished in " : "running for ";
-      if (IsNonJSONFormat()) {
-        int run_time_sec = (MonoTime::Now() - start).ToSeconds();
-        (*out) << "Checksum " << status << run_time_sec << "s: "
-               << responses_.count() << "/" << expected_count_ <<
" replicas remaining ("
-               << HumanReadableNumBytes::ToString(disk_bytes_summed_.Load()) <<
" from disk, "
-               << HumanReadableInt::ToString(rows_summed_.Load()) << " rows summed)"
-               << endl;
-      }
-    }
-    return true;
-  }
-
-  // Returns true iff all replicas have reported in.
-  bool AllReported() const { return responses_.count() == 0; }
-
-  // Get reported results.
-  TabletResultMap checksums() const {
-    std::lock_guard<simple_spinlock> guard(lock_);
-    return checksums_;
-  }
-
- private:
-  friend class RefCountedThreadSafe<ChecksumResultReporter>;
-  ~ChecksumResultReporter() {}
-
-  // Report either a success or error response.
-  void HandleResponse(const std::string& tablet_id, const std::string& replica_uuid,
-                      const Status& status, uint64_t checksum);
-
-  const int expected_count_;
-  CountDownLatch responses_;
-
-  mutable simple_spinlock lock_; // Protects 'checksums_'.
-  // checksums_ is an unordered_map of { tablet_id : { replica_uuid : checksum } }.
-  TabletResultMap checksums_;
-
-  AtomicInt<int64_t> rows_summed_;
-  AtomicInt<int64_t> disk_bytes_summed_;
-};
-
-// Queue of tablet replicas for an individual tablet server.
-typedef shared_ptr<BlockingQueue<std::pair<Schema, std::string>>> SharedTabletQueue;
-
-// A set of callbacks which records the result of a tablet replica's checksum,
-// and then checks if the tablet server has any more tablets to checksum. If so,
-// a new async checksum scan is started.
-class TabletServerChecksumCallbacks : public ChecksumProgressCallbacks {
- public:
-  TabletServerChecksumCallbacks(
-    scoped_refptr<ChecksumResultReporter> reporter,
-    shared_ptr<KsckTabletServer> tablet_server,
-    SharedTabletQueue queue,
-    std::string tablet_id,
-    ChecksumOptions options) :
-      reporter_(std::move(reporter)),
-      tablet_server_(std::move(tablet_server)),
-      queue_(std::move(queue)),
-      options_(options),
-      tablet_id_(std::move(tablet_id)) {
-  }
-
-  void Progress(int64_t rows_summed, int64_t disk_bytes_summed) override {
-    reporter_->ReportProgress(rows_summed, disk_bytes_summed);
-  }
-
-  void Finished(const Status& status, uint64_t checksum) override {
-    reporter_->ReportResult(tablet_id_, tablet_server_->uuid(), status, checksum);
-
-    std::pair<Schema, std::string> table_tablet;
-    if (queue_->BlockingGet(&table_tablet)) {
-      const Schema& table_schema = table_tablet.first;
-      tablet_id_ = table_tablet.second;
-      tablet_server_->RunTabletChecksumScanAsync(tablet_id_, table_schema, options_, this);
-    } else {
-      delete this;
-    }
-  }
-
- private:
-  const scoped_refptr<ChecksumResultReporter> reporter_;
-  const shared_ptr<KsckTabletServer> tablet_server_;
-  const SharedTabletQueue queue_;
-  const ChecksumOptions options_;
-
-  std::string tablet_id_;
-};
-
-Status Ksck::ChecksumData(const ChecksumOptions& opts) {
+Status Ksck::ChecksumData(const KsckChecksumOptions& opts) {
   // Copy options so that local modifications can be made and passed on.
-  ChecksumOptions options = opts;
+  KsckChecksumOptions options = opts;
 
   typedef unordered_map<shared_ptr<KsckTablet>, shared_ptr<KsckTable>>
TabletTableMap;
   TabletTableMap tablet_table_map;
@@ -762,7 +597,8 @@ Status Ksck::ChecksumData(const ChecksumOptions& opts) {
     }
   }
 
-  if (options.use_snapshot && options.snapshot_timestamp == ChecksumOptions::kCurrentTimestamp)
{
+  if (options.use_snapshot &&
+      options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
     // Set the snapshot timestamp to the current timestamp of the first healthy tablet server
     // we can find.
     for (const auto& ts : tablet_server_queues) {
@@ -771,7 +607,7 @@ Status Ksck::ChecksumData(const ChecksumOptions& opts) {
         break;
       }
     }
-    if (options.snapshot_timestamp == ChecksumOptions::kCurrentTimestamp) {
+    if (options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
       return Status::ServiceUnavailable(
           "No tablet servers were available to fetch the current timestamp");
     }
@@ -798,7 +634,9 @@ Status Ksck::ChecksumData(const ChecksumOptions& opts) {
     }
   }
 
-  bool timed_out = !reporter->WaitFor(options.timeout, out_);
+  // Don't ruin JSON output by printing progress updates.
+  auto* out_for_progress_updates = IsNonJSONFormat() ? out_ : nullptr;
+  bool timed_out = !reporter->WaitFor(options.timeout, out_for_progress_updates);
 
   // Even if we timed out, for printing collate the checksum results that we did get.
   ChecksumResultReporter::TabletResultMap checksums = reporter->checksums();

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index 3857b4d..8e5e0b2 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -41,40 +41,17 @@
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet.pb.h"  // IWYU pragma: keep
 #include "kudu/tools/ksck_results.h"
-#include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
-namespace tools {
-
-class KsckTable;
-
-// Options for checksum scans.
-struct ChecksumOptions {
- public:
-
-  ChecksumOptions();
-
-  ChecksumOptions(MonoDelta timeout,
-                  int scan_concurrency,
-                  bool use_snapshot,
-                  uint64_t snapshot_timestamp);
-
-  // The maximum total time to wait for results to come back from all replicas.
-  MonoDelta timeout;
-
-  // The maximum number of concurrent checksum scans to run per tablet server.
-  int scan_concurrency;
 
-  // Whether to use a snapshot checksum scanner.
-  bool use_snapshot;
+class MonoDelta;
 
-  // The snapshot timestamp to use for snapshot checksum scans.
-  uint64_t snapshot_timestamp;
+namespace tools {
 
-  // A timestamp indicating that the current time should be used for a checksum snapshot.
-  static const uint64_t kCurrentTimestamp;
-};
+class KsckChecksumProgressCallbacks;
+class KsckTable;
+struct KsckChecksumOptions;
 
 // Representation of a tablet replica on a tablet server.
 class KsckTabletReplica {
@@ -178,21 +155,6 @@ class KsckTable {
   DISALLOW_COPY_AND_ASSIGN(KsckTable);
 };
 
-// Interface for reporting progress on checksumming a single
-// replica.
-class ChecksumProgressCallbacks {
- public:
-  virtual ~ChecksumProgressCallbacks() {}
-
-  // Report incremental progress from the server side.
-  // 'disk_bytes_summed' only counts data read from DiskRowSets on the server side
-  // and does not count MRS bytes, etc.
-  virtual void Progress(int64_t delta_rows_summed, int64_t delta_disk_bytes_summed) = 0;
-
-  // The scan of the current tablet is complete.
-  virtual void Finished(const Status& status, uint64_t checksum) = 0;
-};
-
 // Enum representing the fetch status of a ksck master or tablet server.
 enum class KsckFetchState {
   // Information has not yet been fetched.
@@ -336,8 +298,8 @@ class KsckTabletServer {
   virtual void RunTabletChecksumScanAsync(
                   const std::string& tablet_id,
                   const Schema& schema,
-                  const ChecksumOptions& options,
-                  ChecksumProgressCallbacks* callbacks) = 0;
+                  const KsckChecksumOptions& options,
+                  KsckChecksumProgressCallbacks* callbacks) = 0;
 
   virtual const std::string& uuid() const {
     return uuid_;
@@ -550,7 +512,7 @@ class Ksck {
 
   // Verifies data checksums on all tablets by doing a scan of the database on each replica.
   // Must first call FetchTableAndTabletInfo().
-  Status ChecksumData(const ChecksumOptions& options);
+  Status ChecksumData(const KsckChecksumOptions& opts);
 
   // Runs all the checks of ksck in the proper order, including checksum scans,
   // if enabled. Returns OK if and only if all checks succeed.

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck_checksum.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_checksum.cc b/src/kudu/tools/ksck_checksum.cc
new file mode 100644
index 0000000..9d2b289
--- /dev/null
+++ b/src/kudu/tools/ksck_checksum.cc
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/ksck_checksum.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <iostream>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include <gflags/gflags.h>
+
+#include "kudu/common/schema.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tools/ksck.h"
+
+using std::endl;
+using std::shared_ptr;
+using std::string;
+using std::unordered_map;
+using strings::Substitute;
+
+DEFINE_int32(checksum_timeout_sec, 3600,
+             "Maximum total seconds to wait for a checksum scan to complete "
+             "before timing out.");
+DEFINE_int32(checksum_scan_concurrency, 4,
+             "Number of concurrent checksum scans to execute per tablet server.");
+DEFINE_bool(checksum_snapshot, true, "Should the checksum scanner use a snapshot scan?");
+DEFINE_uint64(checksum_snapshot_timestamp,
+              kudu::tools::KsckChecksumOptions::kCurrentTimestamp,
+              "Timestamp to use for snapshot checksum scans. Defaults to 0, which "
+              "uses the current timestamp of a tablet server involved in the scan.");
+
+namespace kudu {
+namespace tools {
+
+KsckChecksumOptions::KsckChecksumOptions()
+    : timeout(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec)),
+      scan_concurrency(FLAGS_checksum_scan_concurrency),
+      use_snapshot(FLAGS_checksum_snapshot),
+      snapshot_timestamp(FLAGS_checksum_snapshot_timestamp) {}
+
+KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout, int scan_concurrency,
+                                         bool use_snapshot, uint64_t snapshot_timestamp)
+    : timeout(timeout),
+      scan_concurrency(scan_concurrency),
+      use_snapshot(use_snapshot),
+      snapshot_timestamp(snapshot_timestamp) {}
+
+ChecksumResultReporter::ChecksumResultReporter(int num_tablet_replicas)
+    : expected_count_(num_tablet_replicas),
+      responses_(num_tablet_replicas),
+      rows_summed_(0),
+      disk_bytes_summed_(0) {}
+
+void ChecksumResultReporter::ReportProgress(int64_t delta_rows, int64_t delta_bytes) {
+  rows_summed_.IncrementBy(delta_rows);
+  disk_bytes_summed_.IncrementBy(delta_bytes);
+}
+
+// Write an entry to the result map indicating a response from the remote.
+void ChecksumResultReporter::ReportResult(const string& tablet_id,
+                                          const string& replica_uuid,
+                                          const Status& status,
+                                          uint64_t checksum) {
+  std::lock_guard<simple_spinlock> guard(lock_);
+  unordered_map<string, ResultPair>& replica_results =
+      LookupOrInsert(&checksums_, tablet_id, unordered_map<string, ResultPair>());
+  InsertOrDie(&replica_results, replica_uuid, ResultPair(status, checksum));
+  responses_.CountDown();
+}
+
+// Blocks until either the number of results plus errors reported equals
+// num_tablet_replicas (from the constructor), or until the timeout expires,
+// whichever comes first. Progress messages are printed to 'out'.
+// Returns false if the timeout expired before all responses came in.
+// Otherwise, returns true.
+// Print progress updates to 'out' if it is non-null.
+bool ChecksumResultReporter::WaitFor(const MonoDelta& timeout, std::ostream* out) const
{
+  MonoTime start = MonoTime::Now();
+  MonoTime deadline = start + timeout;
+
+  bool done = false;
+  while (!done) {
+    MonoTime now = MonoTime::Now();
+    auto rem_ms = (deadline - now).ToMilliseconds();
+    if (rem_ms <= 0) return false;
+
+    constexpr int64_t max_wait_ms = 5000;
+    done = responses_.WaitFor(
+        MonoDelta::FromMilliseconds(std::min(rem_ms, max_wait_ms)));
+    if (out) {
+      string status = done ? "finished in" : "running for";
+      int run_time_sec = (MonoTime::Now() - start).ToSeconds();
+      (*out) << Substitute("Checksum $0 $1s: $2/$3 replicas remaining "
+                           "($4 from disk, $5 rows summed)",
+                           status,
+                           run_time_sec,
+                           responses_.count(),
+                           expected_count_,
+                           HumanReadableNumBytes::ToString(disk_bytes_summed_.Load()),
+                           HumanReadableInt::ToString(rows_summed_.Load()))
+             << endl;
+    }
+  }
+  return true;
+}
+
+TabletServerChecksumCallbacks::TabletServerChecksumCallbacks(
+    scoped_refptr<ChecksumResultReporter> reporter,
+    shared_ptr<KsckTabletServer> tablet_server,
+    SharedTabletQueue queue,
+    string tablet_id,
+    KsckChecksumOptions options)
+    : reporter_(std::move(reporter)),
+      tablet_server_(std::move(tablet_server)),
+      queue_(std::move(queue)),
+      options_(options),
+      tablet_id_(std::move(tablet_id)) {}
+
+void TabletServerChecksumCallbacks::Progress(int64_t rows_summed, int64_t disk_bytes_summed)
{
+  reporter_->ReportProgress(rows_summed, disk_bytes_summed);
+}
+
+void TabletServerChecksumCallbacks::Finished(const Status& status, uint64_t checksum)
{
+  reporter_->ReportResult(tablet_id_, tablet_server_->uuid(), status, checksum);
+
+  std::pair<Schema, string> table_tablet;
+  if (queue_->BlockingGet(&table_tablet)) {
+    const Schema& table_schema = table_tablet.first;
+    tablet_id_ = table_tablet.second;
+    tablet_server_->RunTabletChecksumScanAsync(tablet_id_, table_schema, options_, this);
+  } else {
+    delete this;
+  }
+}
+
+} // namespace tools
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck_checksum.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_checksum.h b/src/kudu/tools/ksck_checksum.h
new file mode 100644
index 0000000..7285f92
--- /dev/null
+++ b/src/kudu/tools/ksck_checksum.h
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <iosfwd>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/blocking_queue.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Schema;
+
+namespace tools {
+
+class KsckTabletServer;
+
+// Options for checksum scans.
+struct KsckChecksumOptions {
+  // A checksum with this special timestamp will use a timestamp selected by
+  // one of tablet servers performing the snapshot scan.
+  static constexpr uint64_t kCurrentTimestamp = 0;
+
+  KsckChecksumOptions();
+
+  KsckChecksumOptions(MonoDelta timeout,
+                      int scan_concurrency,
+                      bool use_snapshot,
+                      uint64_t snapshot_timestamp);
+
+  // The maximum total time to wait for results to come back from all replicas.
+  MonoDelta timeout;
+
+  // The maximum number of concurrent checksum scans to run per tablet server.
+  int scan_concurrency;
+
+  // Whether to use a snapshot checksum scanner.
+  bool use_snapshot;
+
+  // The snapshot timestamp to use for snapshot checksum scans.
+  uint64_t snapshot_timestamp;
+};
+
+// Interface for reporting progress on checksumming a single
+// replica.
+class KsckChecksumProgressCallbacks {
+ public:
+  virtual ~KsckChecksumProgressCallbacks() {}
+
+  // Report incremental progress from the server side.
+  // 'delta_disk_bytes_summed' only counts data read from DiskRowSets on the
+  // server side and does not count MRS bytes, etc.
+  virtual void Progress(int64_t delta_rows_summed, int64_t delta_disk_bytes_summed) = 0;
+
+  // The scan of the current tablet is complete.
+  virtual void Finished(const Status& status, uint64_t checksum) = 0;
+};
+
+// Class to act as a collector of scan results.
+// Provides thread-safe accessors to update and read a hash table of results.
+class ChecksumResultReporter : public RefCountedThreadSafe<ChecksumResultReporter>
{
+ public:
+  typedef std::pair<Status, uint64_t> ResultPair;
+  typedef std::unordered_map<std::string, ResultPair> ReplicaResultMap;
+  typedef std::unordered_map<std::string, ReplicaResultMap> TabletResultMap;
+
+  // Initialize reporter with the number of replicas being queried.
+  explicit ChecksumResultReporter(int num_tablet_replicas);
+
+  void ReportProgress(int64_t delta_rows, int64_t delta_bytes);
+
+  // Write an entry to the result map indicating a response from the remote.
+  void ReportResult(const std::string& tablet_id,
+                    const std::string& replica_uuid,
+                    const Status& status,
+                    uint64_t checksum);
+
+  // Blocks until either the number of results plus errors reported equals
+  // num_tablet_replicas (from the constructor), or until the timeout expires,
+  // whichever comes first. Progress messages are printed to 'out'.
+  // Returns false if the timeout expired before all responses came in.
+  // Otherwise, returns true.
+  bool WaitFor(const MonoDelta& timeout, std::ostream* out) const;
+
+  // Returns true iff all replicas have reported in.
+  bool AllReported() const { return responses_.count() == 0; }
+
+  // Get reported results.
+  TabletResultMap checksums() const {
+    std::lock_guard<simple_spinlock> guard(lock_);
+    return checksums_;
+  }
+
+ private:
+  friend class RefCountedThreadSafe<ChecksumResultReporter>;
+  ~ChecksumResultReporter() {}
+
+  const int expected_count_;
+  CountDownLatch responses_;
+
+  mutable simple_spinlock lock_; // Protects 'checksums_'.
+  // checksums_ is an unordered_map of { tablet_id : { replica_uuid : checksum } }.
+  TabletResultMap checksums_;
+
+  AtomicInt<int64_t> rows_summed_;
+  AtomicInt<int64_t> disk_bytes_summed_;
+};
+
+// Queue of tablet replicas for an individual tablet server.
+typedef std::shared_ptr<BlockingQueue<std::pair<Schema, std::string>>>
SharedTabletQueue;
+
+// A set of callbacks which records the result of a tablet replica's checksum,
+// and then checks if the tablet server has any more tablets to checksum. If so,
+// a new async checksum scan is started.
+class TabletServerChecksumCallbacks : public KsckChecksumProgressCallbacks {
+ public:
+  TabletServerChecksumCallbacks(
+      scoped_refptr<ChecksumResultReporter> reporter,
+      std::shared_ptr<KsckTabletServer> tablet_server,
+      SharedTabletQueue queue,
+      std::string tablet_id,
+      KsckChecksumOptions options);
+
+  void Progress(int64_t rows_summed, int64_t disk_bytes_summed) override;
+
+  void Finished(const Status& status, uint64_t checksum) override;
+
+ private:
+  const scoped_refptr<ChecksumResultReporter> reporter_;
+  const std::shared_ptr<KsckTabletServer> tablet_server_;
+  const SharedTabletQueue queue_;
+  const KsckChecksumOptions options_;
+
+  std::string tablet_id_;
+};
+} // namespace tools
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck_remote-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote-test.cc b/src/kudu/tools/ksck_remote-test.cc
index 456544f..f41298c 100644
--- a/src/kudu/tools/ksck_remote-test.cc
+++ b/src/kudu/tools/ksck_remote-test.cc
@@ -43,6 +43,7 @@
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/tools/data_gen_util.h"
 #include "kudu/tools/ksck.h"
+#include "kudu/tools/ksck_checksum.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/countdown_latch.h"
@@ -318,7 +319,7 @@ TEST_F(RemoteKsckTest, TestChecksum) {
     ASSERT_OK(ksck_->FetchInfoFromTabletServers());
 
     err_stream_.str("");
-    s = ksck_->ChecksumData(ChecksumOptions(MonoDelta::FromSeconds(1), 16, false, 0));
+    s = ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(1), 16, false,
0));
     if (s.ok()) {
       // Check the status message at the end of the checksum.
       // We expect '0B from disk' because we didn't write enough data to trigger a flush
@@ -342,7 +343,7 @@ TEST_F(RemoteKsckTest, TestChecksumTimeout) {
   ASSERT_OK(ksck_->FetchTableAndTabletInfo());
   ASSERT_OK(ksck_->FetchInfoFromTabletServers());
   // Use an impossibly low timeout value of zero!
-  Status s = ksck_->ChecksumData(ChecksumOptions(MonoDelta::FromNanoseconds(0), 16, false,
0));
+  Status s = ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromNanoseconds(0), 16,
false, 0));
   ASSERT_TRUE(s.IsTimedOut()) << "Expected TimedOut Status, got: " << s.ToString();
 }
 
@@ -365,7 +366,7 @@ TEST_F(RemoteKsckTest, TestChecksumSnapshot) {
   ASSERT_OK(ksck_->CheckClusterRunning());
   ASSERT_OK(ksck_->FetchTableAndTabletInfo());
   ASSERT_OK(ksck_->FetchInfoFromTabletServers());
-  ASSERT_OK(ksck_->ChecksumData(ChecksumOptions(MonoDelta::FromSeconds(10), 16, true,
ts)));
+  ASSERT_OK(ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(10), 16, true,
ts)));
   continue_writing.Store(false);
   ASSERT_OK(promise.Get());
   writer_thread->Join();
@@ -391,8 +392,8 @@ TEST_F(RemoteKsckTest, TestChecksumSnapshotCurrentTimestamp) {
   ASSERT_OK(ksck_->CheckClusterRunning());
   ASSERT_OK(ksck_->FetchTableAndTabletInfo());
   ASSERT_OK(ksck_->FetchInfoFromTabletServers());
-  ASSERT_OK(ksck_->ChecksumData(ChecksumOptions(MonoDelta::FromSeconds(10), 16, true,
-                                                ChecksumOptions::kCurrentTimestamp)));
+  ASSERT_OK(ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(10), 16, true,
+                                                    KsckChecksumOptions::kCurrentTimestamp)));
   continue_writing.Store(false);
   ASSERT_OK(promise.Get());
   writer_thread->Join();

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck_remote.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index 98b2b0a..c44922b 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -52,6 +52,7 @@
 #include "kudu/server/server_base.proxy.h"
 #include "kudu/tablet/tablet.pb.h"
 #include "kudu/tools/ksck.h"
+#include "kudu/tools/ksck_checksum.h"
 #include "kudu/tools/ksck_results.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/tserver.pb.h"
@@ -295,7 +296,7 @@ class ChecksumCallbackHandler {
 class ChecksumStepper {
  public:
   ChecksumStepper(string tablet_id, const Schema& schema, string server_uuid,
-                  ChecksumOptions options, ChecksumProgressCallbacks* callbacks,
+                  KsckChecksumOptions options, KsckChecksumProgressCallbacks* callbacks,
                   shared_ptr<tserver::TabletServerServiceProxy> proxy)
       : schema_(schema),
         tablet_id_(std::move(tablet_id)),
@@ -396,8 +397,8 @@ class ChecksumStepper {
 
   const string tablet_id_;
   const string server_uuid_;
-  const ChecksumOptions options_;
-  ChecksumProgressCallbacks* const callbacks_;
+  const KsckChecksumOptions options_;
+  KsckChecksumProgressCallbacks* const callbacks_;
   const shared_ptr<tserver::TabletServerServiceProxy> proxy_;
 
   uint32_t call_seq_id_;
@@ -416,8 +417,8 @@ void ChecksumCallbackHandler::Run() {
 void RemoteKsckTabletServer::RunTabletChecksumScanAsync(
         const string& tablet_id,
         const Schema& schema,
-        const ChecksumOptions& options,
-        ChecksumProgressCallbacks* callbacks) {
+        const KsckChecksumOptions& options,
+        KsckChecksumProgressCallbacks* callbacks) {
   gscoped_ptr<ChecksumStepper> stepper(
       new ChecksumStepper(tablet_id, schema, uuid(), options, callbacks, ts_proxy_));
   stepper->Start();

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck_remote.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.h b/src/kudu/tools/ksck_remote.h
index 07bb0b7..54d0f1d 100644
--- a/src/kudu/tools/ksck_remote.h
+++ b/src/kudu/tools/ksck_remote.h
@@ -54,7 +54,9 @@ class TabletServerServiceProxy;
 
 namespace tools {
 
+class KsckChecksumProgressCallbacks;
 enum class KsckServerHealth;
+struct KsckChecksumOptions;
 
 // This implementation connects to a master via RPC.
 class RemoteKsckMaster : public KsckMaster {
@@ -106,8 +108,8 @@ class RemoteKsckTabletServer : public KsckTabletServer {
   void RunTabletChecksumScanAsync(
       const std::string& tablet_id,
       const Schema& schema,
-      const ChecksumOptions& options,
-      ChecksumProgressCallbacks* callbacks) override;
+      const KsckChecksumOptions& options,
+      KsckChecksumProgressCallbacks* callbacks) override;
 
   virtual std::string address() const override {
     return host_port_.ToString();


Mime
View raw message