kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] branch master updated: [tools] a small cleanup on ksck code
Date Tue, 21 May 2019 20:54:23 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 38c2e12  [tools] a small cleanup on ksck code
38c2e12 is described below

commit 38c2e122292b262a4a2ee1af931b27dd272bd862
Author: Alexey Serbin <alexey@apache.org>
AuthorDate: Tue May 14 11:57:27 2019 -0700

    [tools] a small cleanup on ksck code
    
    Change-Id: I14d2451cc757c95543be6fa8c7dc5c95c7d73be8
    Reviewed-on: http://gerrit.cloudera.org:8080/13379
    Tested-by: Alexey Serbin <aserbin@cloudera.com>
    Reviewed-by: Adar Dembo <adar@cloudera.com>
---
 src/kudu/tools/ksck.cc        | 121 +++++++++++++++++++++---------------------
 src/kudu/tools/ksck.h         |   9 ++--
 src/kudu/tools/ksck_remote.cc |  76 +++++++++++++-------------
 3 files changed, 101 insertions(+), 105 deletions(-)

diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 0a77cc0..134319f 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -18,6 +18,8 @@
 #include "kudu/tools/ksck.h"
 
 #include <algorithm>
+#include <atomic>
+#include <cstddef>
 #include <cstdint>
 #include <iostream>
 #include <iterator>
@@ -41,7 +43,6 @@
 #include "kudu/tools/color.h"
 #include "kudu/tools/ksck_checksum.h"
 #include "kudu/tools/tool_action_common.h"
-#include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/string_case.h"
@@ -70,14 +71,12 @@ DEFINE_string(ksck_format, "plain_concise",
 DEFINE_bool(consensus, true,
             "Whether to check the consensus state from each tablet against the master.");
 
+using std::atomic;
 using std::cout;
-using std::endl;
 using std::ostream;
 using std::ostringstream;
-using std::set;
 using std::shared_ptr;
 using std::string;
-using std::unordered_map;
 using std::vector;
 using strings::Substitute;
 
@@ -154,20 +153,20 @@ std::ostream& operator<<(std::ostream& lhs, KsckFetchState
state) {
 Ksck::Ksck(shared_ptr<KsckCluster> cluster, ostream* out)
     : cluster_(std::move(cluster)),
       out_(out == nullptr ? &std::cout : out) {
-  CHECK_OK(ThreadPoolBuilder("Ksck-fetch")
+  CHECK_OK(ThreadPoolBuilder("ksck-fetch")
                .set_max_threads(FLAGS_fetch_info_concurrency)
                .set_idle_timeout(MonoDelta::FromMilliseconds(10))
                .Build(&pool_));
 }
 
 Status Ksck::CheckMasterHealth() {
-  int num_masters = static_cast<int>(cluster_->masters().size());
+  const auto num_masters = cluster_->masters().size();
   if (num_masters == 0) {
-    return Status::NotFound("No masters found");
+    return Status::NotFound("no masters found");
   }
 
-  AtomicInt<int32_t> bad_masters(0);
-  AtomicInt<int32_t> unauthorized_masters(0);
+  atomic<size_t> bad_masters(0);
+  atomic<size_t> unauthorized_masters(0);
 
   vector<KsckServerHealthSummary> master_summaries;
   simple_spinlock master_summaries_lock;
@@ -185,16 +184,16 @@ Status Ksck::CheckMasterHealth() {
         if (!s.ok()) {
           if (IsNotAuthorizedMethodAccess(s)) {
             sh.health = KsckServerHealth::UNAUTHORIZED;
-            unauthorized_masters.Increment();
+            ++unauthorized_masters;
           } else {
             sh.health = KsckServerHealth::UNAVAILABLE;
           }
-          bad_masters.Increment();
+          ++bad_masters;
         }
 
         {
           std::lock_guard<simple_spinlock> lock(master_summaries_lock);
-          master_summaries.push_back(std::move(sh));
+          master_summaries.emplace_back(std::move(sh));
         }
 
         // Fetch the flags information.
@@ -202,7 +201,7 @@ Status Ksck::CheckMasterHealth() {
         s = master->FetchUnusualFlags();
         if (!s.ok()) {
           std::lock_guard<simple_spinlock> lock(master_summaries_lock);
-          results_.warning_messages.push_back(s.CloneAndPrepend(Substitute(
+          results_.warning_messages.emplace_back(s.CloneAndPrepend(Substitute(
               "unable to get flag information for master $0 ($1)",
               master->uuid(),
               master->address())));
@@ -215,16 +214,16 @@ Status Ksck::CheckMasterHealth() {
 
   // Return a NotAuthorized status if any master has auth errors, since this
   // indicates ksck may not be able to gather full and accurate info.
-  if (unauthorized_masters.Load() > 0) {
+  if (unauthorized_masters > 0) {
     return Status::NotAuthorized(
         Substitute("failed to gather info from $0 of $1 "
                    "masters due to lack of admin privileges",
-                   unauthorized_masters.Load(), num_masters));
+                   unauthorized_masters.load(), num_masters));
   }
-  if (bad_masters.Load() > 0) {
+  if (bad_masters > 0) {
     return Status::NetworkError(
         Substitute("failed to gather info from all masters: $0 of $1 had errors",
-                   bad_masters.Load(), num_masters));
+                   bad_masters.load(), num_masters));
   }
   return Status::OK();
 }
@@ -323,16 +322,15 @@ Status Ksck::FetchTableAndTabletInfo() {
 }
 
 Status Ksck::FetchInfoFromTabletServers() {
-  VLOG(1) << "Fetching the list of tablet servers";
-  int servers_count = static_cast<int>(cluster_->tablet_servers().size());
+  const auto servers_count = cluster_->tablet_servers().size();
   VLOG(1) << Substitute("List of $0 tablet servers retrieved", servers_count);
 
   if (servers_count == 0) {
     return Status::OK();
   }
 
-  AtomicInt<int32_t> bad_servers(0);
-  AtomicInt<int32_t> unauthorized_servers(0);
+  atomic<size_t> bad_servers(0);
+  atomic<size_t> unauthorized_servers(0);
   VLOG(1) << "Fetching info from all " << servers_count << " tablet servers";
 
   vector<KsckServerHealthSummary> tablet_server_summaries;
@@ -341,44 +339,45 @@ Status Ksck::FetchInfoFromTabletServers() {
   for (const auto& entry : cluster_->tablet_servers()) {
     const auto& ts = entry.second;
     RETURN_NOT_OK(pool_->SubmitFunc([&]() {
-        VLOG(1) << "Going to connect to tablet server: " << ts->uuid();
-        KsckServerHealth health;
-        Status s = ts->FetchInfo(&health).AndThen([&ts, &health]() {
-            if (FLAGS_consensus) {
-              return ts->FetchConsensusState(&health);
-            }
-            return Status::OK();
-        });
-        KsckServerHealthSummary summary;
-        summary.uuid = ts->uuid();
-        summary.address = ts->address();
-        summary.ts_location = ts->location();
-        summary.version = ts->version();
-        summary.status = s;
-        if (!s.ok()) {
-          if (IsNotAuthorizedMethodAccess(s)) {
-            health = KsckServerHealth::UNAUTHORIZED;
-            unauthorized_servers.Increment();
+      VLOG(1) << "Going to connect to tablet server: " << ts->uuid();
+      KsckServerHealth health;
+      Status s = ts->FetchInfo(&health).AndThen([&ts, &health]() {
+          if (FLAGS_consensus) {
+            return ts->FetchConsensusState(&health);
           }
-          bad_servers.Increment();
+          return Status::OK();
+      });
+      KsckServerHealthSummary summary;
+      summary.uuid = ts->uuid();
+      summary.address = ts->address();
+      summary.ts_location = ts->location();
+      summary.version = ts->version();
+      summary.status = s;
+      if (!s.ok()) {
+        if (IsNotAuthorizedMethodAccess(s)) {
+          health = KsckServerHealth::UNAUTHORIZED;
+          ++unauthorized_servers;
         }
-        summary.health = health;
+        ++bad_servers;
+      }
+      summary.health = health;
 
-        {
-          std::lock_guard<simple_spinlock> lock(tablet_server_summaries_lock);
-          tablet_server_summaries.push_back(std::move(summary));
-        }
+      {
+        std::lock_guard<simple_spinlock> lock(tablet_server_summaries_lock);
+        tablet_server_summaries.push_back(std::move(summary));
+      }
 
-        // Fetch the flags information.
-        // Failing to gather flags is only a warning.
-        s = ts->FetchUnusualFlags();
-        if (!s.ok()) {
-          std::lock_guard<simple_spinlock> lock(tablet_server_summaries_lock);
-          results_.warning_messages.push_back(s.CloneAndPrepend(Substitute(
-              "unable to get flag information for tablet server $0 ($1)",
-              ts->uuid(),
-              ts->address())));
-        }
+      // Fetch the flags information.
+      // Failing to gather flags is only a warning since fetching the flags
+      // is not supported in older versions.
+      s = ts->FetchUnusualFlags();
+      if (!s.ok()) {
+        std::lock_guard<simple_spinlock> lock(tablet_server_summaries_lock);
+        results_.warning_messages.emplace_back(s.CloneAndPrepend(Substitute(
+            "unable to get flag information for tablet server $0 ($1)",
+            ts->uuid(),
+            ts->address())));
+      }
     }));
   }
   pool_->Wait();
@@ -387,16 +386,16 @@ Status Ksck::FetchInfoFromTabletServers() {
 
   // Return a NotAuthorized status if any tablet server has auth errors, since
   // this indicates ksck may not be able to gather full and accurate info.
-  if (unauthorized_servers.Load() > 0) {
+  if (unauthorized_servers > 0) {
     return Status::NotAuthorized(
         Substitute("failed to gather info from $0 of $1 tablet servers due "
                    "to lack of admin privileges",
-                   unauthorized_servers.Load(), servers_count));
+                   unauthorized_servers.load(), servers_count));
   }
-  if (bad_servers.Load() > 0) {
+  if (bad_servers > 0) {
     return Status::NetworkError(
-      Substitute("failed to gather info for all tablet servers: $0 of $1 had errors",
-                 bad_servers.Load(), servers_count));
+        Substitute("failed to gather info for all tablet servers: $0 of $1 had errors",
+                   bad_servers.load(), servers_count));
   }
   return Status::OK();
 }
@@ -594,7 +593,7 @@ Status Ksck::ChecksumData(const KsckChecksumOptions& opts) {
 }
 
 bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table) {
-  const auto all_tablets = table->tablets();
+  const auto& all_tablets = table->tablets();
   vector<shared_ptr<KsckTablet>> tablets;
   std::copy_if(all_tablets.begin(), all_tablets.end(), std::back_inserter(tablets),
                  [&](const shared_ptr<KsckTablet>& t) {
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index ca788ab..181af26 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -123,11 +123,12 @@ class KsckTablet {
 // Representation of a table. Composed of tablets.
 class KsckTable {
  public:
-  KsckTable(std::string id, std::string name, const Schema& schema, int num_replicas)
+  KsckTable(std::string id, std::string name, Schema schema, int num_replicas)
       : id_(std::move(id)),
         name_(std::move(name)),
-        schema_(schema),
-        num_replicas_(num_replicas) {}
+        schema_(std::move(schema)),
+        num_replicas_(num_replicas) {
+  }
 
   const std::string& id() const {
     return id_;
@@ -149,7 +150,7 @@ class KsckTable {
     tablets_ = std::move(tablets);
   }
 
-  std::vector<std::shared_ptr<KsckTablet>>& tablets() {
+  const std::vector<std::shared_ptr<KsckTablet>>& tablets() const {
     return tablets_;
   }
 
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index f217108..efa2ba6 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -61,7 +61,6 @@
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_service.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
-#include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
@@ -74,27 +73,26 @@ DECLARE_int32(fetch_info_concurrency);
 
 DEFINE_bool(checksum_cache_blocks, false, "Should the checksum scanners cache the read blocks");
 
-namespace kudu {
-namespace tools {
-
-static const std::string kMessengerName = "ksck";
-
-using client::KuduClient;
-using client::KuduClientBuilder;
-using client::KuduReplica;
-using client::KuduScanToken;
-using client::KuduScanTokenBuilder;
-using client::KuduTable;
-using client::KuduTabletServer;
-using client::internal::ReplicaController;
-using rpc::Messenger;
-using rpc::MessengerBuilder;
-using rpc::RpcController;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduScanToken;
+using kudu::client::KuduScanTokenBuilder;
+using kudu::client::KuduSchema;
+using kudu::client::KuduTable;
+using kudu::client::KuduTabletServer;
+using kudu::client::internal::ReplicaController;
+using kudu::rpc::Messenger;
+using kudu::rpc::MessengerBuilder;
+using kudu::rpc::RpcController;
 using std::shared_ptr;
 using std::string;
 using std::vector;
 using strings::Substitute;
 
+namespace kudu {
+namespace tools {
+
+static const std::string kMessengerName = "ksck";
+
 namespace {
 MonoDelta GetDefaultTimeout() {
   return MonoDelta::FromMilliseconds(FLAGS_timeout_ms);
@@ -516,54 +514,52 @@ Status RemoteKsckCluster::RetrieveTablesList() {
   vector<string> table_names;
   RETURN_NOT_OK(client_->ListTables(&table_names));
 
-  int num_tables = static_cast<int>(table_names.size());
-  if (num_tables == 0) {
+  if (table_names.empty()) {
     return Status::OK();
   }
 
-  AtomicInt<int32_t> bad_tables(0);
   vector<shared_ptr<KsckTable>> tables;
+  tables.reserve(table_names.size());
   simple_spinlock tables_lock;
 
   for (const auto& table_name : table_names) {
     RETURN_NOT_OK(pool_->SubmitFunc([&]() {
-        client::sp::shared_ptr<KuduTable> t;
-        Status s = client_->OpenTable(table_name, &t);
-        if (!s.ok()) {
-          bad_tables.Increment();
-          LOG(ERROR) << Substitute("unable to open table $0: $1", table_name, s.ToString());
-          return;
-        }
-        shared_ptr<KsckTable> table(new KsckTable(t->id(),
-                                                  table_name,
-                                                  *t->schema().schema_,
-                                                  t->num_replicas()));
-        {
-          std::lock_guard<simple_spinlock> l(tables_lock);
-          tables.push_back(table);
-        }
+      client::sp::shared_ptr<KuduTable> t;
+      Status s = client_->OpenTable(table_name, &t);
+      if (!s.ok()) {
+        LOG(ERROR) << Substitute("unable to open table $0: $1",
+                                 table_name, s.ToString());
+        return;
+      }
+      shared_ptr<KsckTable> table(new KsckTable(t->id(),
+                                                table_name,
+                                                KuduSchema::ToSchema(t->schema()),
+                                                t->num_replicas()));
+      {
+        std::lock_guard<simple_spinlock> l(tables_lock);
+        tables.emplace_back(std::move(table));
+      }
     }));
   }
   pool_->Wait();
 
   tables_.swap(tables);
 
-  if (bad_tables.Load() > 0) {
+  if (tables_.size() < table_names.size()) {
     return Status::NetworkError(
         Substitute("failed to gather info from all tables: $0 of $1 had errors",
-                   bad_tables.Load(), num_tables));
+                   table_names.size() - tables_.size(), table_names.size()));
   }
 
   return Status::OK();
 }
 
 Status RemoteKsckCluster::RetrieveAllTablets() {
-  int num_tables = static_cast<int>(tables().size());
-  if (num_tables == 0) {
+  if (tables_.empty()) {
     return Status::OK();
   }
 
-  for (const shared_ptr<KsckTable>& table : tables()) {
+  for (const auto& table : tables_) {
     RETURN_NOT_OK(pool_->SubmitFunc(
         std::bind(&KsckCluster::RetrieveTabletsList, this, table)));
   }


Mime
View raw message