kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [2/3] kudu git commit: [master/tserver] enforce re-replication scheme consistency
Date Tue, 09 Jan 2018 01:40:58 GMT
[master/tserver] enforce re-replication scheme consistency

Don't register a tablet server with master if they run with different
tablet replica replacement schemes.

Added an integration test to verify the desired behavior.

Change-Id: I71c4c2e72bb2d62cec6de0f6d00b418377e8ae85
Reviewed-on: http://gerrit.cloudera.org:8080/8901
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1769eed5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1769eed5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1769eed5

Branch: refs/heads/master
Commit: 1769eed53ee2c21a88a766cb72bf8c9ae622099d
Parents: 6efc50d
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Wed Dec 13 13:21:50 2017 -0800
Committer: Mike Percy <mpercy@apache.org>
Committed: Tue Jan 9 01:35:19 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/CMakeLists.txt               |  3 +-
 src/kudu/consensus/replica_management.proto     | 41 ++++++++++++
 .../raft_consensus_nonvoter-itest.cc            | 66 +++++++++++++++++++-
 src/kudu/master/master.proto                    |  9 +++
 src/kudu/master/master_service.cc               | 41 +++++++++++-
 src/kudu/tserver/heartbeater.cc                 | 37 ++++++++---
 6 files changed, 182 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1769eed5/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index 03fad03..ef2f9a7 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -25,7 +25,8 @@ PROTOBUF_GENERATE_CPP(
   BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
   PROTO_FILES
     metadata.proto
-    opid.proto)
+    opid.proto
+    replica_management.proto)
 set(METADATA_PROTO_LIBS
   kudu_common_proto
   fs_proto

http://git-wip-us.apache.org/repos/asf/kudu/blob/1769eed5/src/kudu/consensus/replica_management.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/replica_management.proto b/src/kudu/consensus/replica_management.proto
new file mode 100644
index 0000000..b9a1ee1
--- /dev/null
+++ b/src/kudu/consensus/replica_management.proto
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto2";
+package kudu.consensus;
+
+option java_package = "org.apache.kudu.consensus";
+
+// Communicates replica management information between servers.
+message ReplicaManagementInfoPB {
+  // Replica replacement schemes.
+  enum ReplacementScheme {
+    UNKNOWN = 999;
+
+    // The leader replica evicts the failed replica first, and then the new
+    // voter replica is added.
+    EVICT_FIRST = 0;
+
+    // Add a new non-voter replica, promote the replica to voter once it
+    // caught up with the leader, and only after that evict the failed replica.
+    PREPARE_REPLACEMENT_BEFORE_EVICTION = 1;
+  }
+
+  // Using 'optional' instead of 'required' because at some point we may decide
+  // to obsolete this field.
+  optional ReplacementScheme replacement_scheme = 1;
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/1769eed5/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
index f0350dc..6726109 100644
--- a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
@@ -57,6 +57,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_int32(heartbeat_interval_ms);
 DECLARE_int32(num_replicas);
 DECLARE_int32(num_tablet_servers);
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
@@ -88,6 +89,7 @@ using kudu::itest::StartElection;
 using kudu::itest::TServerDetails;
 using kudu::itest::TabletServerMap;
 using kudu::itest::WAIT_FOR_LEADER;
+using kudu::itest::WaitForNumTabletsOnTS;
 using kudu::itest::WaitForReplicasReportedToMaster;
 using kudu::master::ANY_REPLICA;
 using kudu::master::GetTableLocationsResponsePB;
@@ -1242,9 +1244,9 @@ TEST_F(RaftConsensusNonVoterITest, CatalogManagerAddsNonVoter) {
   const MonoDelta kTimeout = MonoDelta::FromSeconds(6 * kReplicaUnavailableSec);
   const int kReplicasNum = 3;
   FLAGS_num_replicas = kReplicasNum;
-  // Need one extra tserver after the tserver with on of the replicas stopped.
-  // Otherwise, the catalog manager would not be able to spawn a new non-voter
-  // replacement replica.
+  // Will need one extra tserver after the tserver with existing voter replica
+  // is stopped. Otherwise, the catalog manager would not be able to spawn
+  // a new non-voter replacement replica.
   FLAGS_num_tablet_servers = kReplicasNum + 1;
   const vector<string> kMasterFlags = {
     // The scenario runs with the new replica management scheme.
@@ -1828,5 +1830,63 @@ TEST_F(RaftConsensusNonVoterITest, NonVoterReplicasInConsensusQueue)
{
   NO_FATALS(cluster_->AssertNoCrashes());
 }
 
+// This test runs master and tablet server with different replica replacement
+// schemes and makes sure that the tablet server is not registered with the
+// master in such case.
+// Also, it makes sure the tablet server crashes to signal the misconfiguration.
+class IncompatibleReplicaReplacementSchemesITest :
+    public RaftConsensusNonVoterITest,
+    public ::testing::WithParamInterface<bool> {
+};
+INSTANTIATE_TEST_CASE_P(, IncompatibleReplicaReplacementSchemesITest,
+                        ::testing::Bool());
+TEST_P(IncompatibleReplicaReplacementSchemesITest, MasterAndTserverMisconfig) {
+  FLAGS_num_tablet_servers = 1;
+  FLAGS_num_replicas = 1;
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(60);
+  const int64_t heartbeat_interval_ms = 500;
+
+  const bool is_3_4_3 = GetParam();
+
+  // The easiest way to have everything setup is to start the cluster with
+  // compatible parameters.
+  const vector<string> kMasterFlags = {
+    Substitute("--raft_prepare_replacement_before_eviction=$0", is_3_4_3),
+  };
+  const vector<string> kTsFlags = {
+    Substitute("--heartbeat_interval_ms=$0", heartbeat_interval_ms),
+    Substitute("--raft_prepare_replacement_before_eviction=$0", is_3_4_3),
+  };
+
+  NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
+
+  vector<master::ListTabletServersResponsePB_Entry> tservers;
+  ASSERT_OK(itest::ListTabletServers(cluster_->master_proxy(), kTimeout, &tservers));
+  ASSERT_EQ(1, tservers.size());
+
+  ASSERT_EQ(1, cluster_->num_tablet_servers());
+  auto* ts = cluster_->tablet_server(0);
+  ASSERT_NE(nullptr, ts);
+  ts->Shutdown();
+
+  auto* master = cluster_->master(0);
+  master->Shutdown();
+  ASSERT_OK(master->Restart());
+
+  // Update corresponding flags to induce a misconfiguration between the master
+  // and the tablet server.
+  ts->mutable_flags()->push_back(
+      Substitute("--raft_prepare_replacement_before_eviction=$0", !is_3_4_3));
+  ASSERT_OK(ts->Restart());
+
+  ASSERT_OK(ts->WaitForFatal(kTimeout));
+
+  SleepFor(MonoDelta::FromMilliseconds(heartbeat_interval_ms * 3));
+
+  // The tablet server should not be registered with the master.
+  ASSERT_OK(itest::ListTabletServers(cluster_->master_proxy(), kTimeout, &tservers));
+  ASSERT_EQ(0, tservers.size());
+}
+
 }  // namespace tserver
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/1769eed5/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 5342960..048489f 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -22,6 +22,7 @@ option java_package = "org.apache.kudu.master";
 import "kudu/common/common.proto";
 import "kudu/common/wire_protocol.proto";
 import "kudu/consensus/metadata.proto";
+import "kudu/consensus/replica_management.proto";
 import "kudu/rpc/rpc_header.proto";
 import "kudu/security/token.proto";
 import "kudu/tablet/metadata.proto";
@@ -73,6 +74,10 @@ message MasterErrorPB {
 
     // The number of replicas requested is illegal (eg non-positive).
     ILLEGAL_REPLICATION_FACTOR = 11;
+
+    // The caller detected an application-level incompatibility with the callee.
+    // This might be caused by a misconfiguration, incompatible features, etc.
+    INCOMPATIBILITY = 12;
   }
 
   // The error code.
@@ -283,6 +288,10 @@ message TSHeartbeatRequestPB {
   // The most recently known TSK sequence number. Allows the master to
   // selectively notify the tablet server of more recent TSKs.
   optional int64 latest_tsk_seq_num = 6;
+
+  // Replica management parameters that the tablet server is running with.
+  // This field is set only if the registration field is present.
+  optional consensus.ReplicaManagementInfoPB replica_management_info = 7;
 }
 
 message TSHeartbeatResponsePB {

http://git-wip-us.apache.org/repos/asf/kudu/blob/1769eed5/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 904b93c..6dc8965 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -24,11 +24,13 @@
 #include <vector>
 
 #include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/common/common.pb.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/replica_management.pb.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
@@ -45,9 +47,12 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 
+DECLARE_bool(raft_prepare_replacement_before_eviction);
+
 DEFINE_int32(master_inject_latency_on_tablet_lookups_ms, 0,
              "Number of milliseconds that the master will sleep before responding to "
              "requests for tablet locations.");
@@ -67,6 +72,7 @@ DEFINE_bool(master_non_leader_masters_propagate_tsk, false,
 TAG_FLAG(master_non_leader_masters_propagate_tsk, hidden);
 
 using google::protobuf::Message;
+using kudu::consensus::ReplicaManagementInfoPB;
 using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
 using kudu::security::SignedTokenPB;
@@ -146,13 +152,43 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
   // 3. Register or look up the tserver.
   shared_ptr<TSDescriptor> ts_desc;
   if (req->has_registration()) {
+    const auto scheme = FLAGS_raft_prepare_replacement_before_eviction
+        ? ReplicaManagementInfoPB::PREPARE_REPLACEMENT_BEFORE_EVICTION
+        : ReplicaManagementInfoPB::EVICT_FIRST;
+    const auto ts_scheme = req->has_replica_management_info()
+        ? req->replica_management_info().replacement_scheme()
+        : ReplicaManagementInfoPB::EVICT_FIRST;
+    // If the catalog manager is running with some different replica management
+    // scheme, report back an error and do not register the tablet server.
+    if (scheme != ts_scheme) {
+      const auto& ts_uuid = req->common().ts_instance().permanent_uuid();
+      const auto& ts_addr = rpc->remote_address().ToString();
+      Status s = Status::ConfigurationError(
+          Substitute("replica replacement scheme ($0) of the tablet server "
+                     "$1 at $2 differs from the catalog manager's ($3); "
+                     "they must be run with the same scheme (controlled "
+                     "by the --raft_prepare_replacement_before_eviction flag)",
+                     ReplicaManagementInfoPB::ReplacementScheme_Name(ts_scheme),
+                     ts_uuid, ts_addr,
+                     ReplicaManagementInfoPB::ReplacementScheme_Name(scheme)));
+      LOG(WARNING) << s.ToString();
+
+      auto* error = resp->mutable_error();
+      StatusToPB(s, error->mutable_status());
+      error->set_code(MasterErrorPB::INCOMPATIBILITY);
+
+      // Yes, this is confusing: the RPC result is success, but the response
+      // contains an application-level error.
+      rpc->RespondSuccess();
+      return;
+    }
     Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(),
                                                  req->registration(),
                                                  &ts_desc);
     if (!s.ok()) {
       LOG(WARNING) << Substitute("Unable to register tserver ($0): $1",
                                  rpc->requestor_string(), s.ToString());
-      // TODO: add service-specific errors
+      // TODO(todd): add service-specific errors
       rpc->RespondFailure(s);
       return;
     }
@@ -170,7 +206,8 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
 
       rpc->RespondSuccess();
       return;
-    } else if (!s.ok()) {
+    }
+    if (!s.ok()) {
       LOG(WARNING) << Substitute("Unable to look up tserver for heartbeat "
           "request $0 from $1: $2", SecureDebugString(*req),
           rpc->requestor_string(), s.ToString());

http://git-wip-us.apache.org/repos/asf/kudu/blob/1769eed5/src/kudu/tserver/heartbeater.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index 012f8df..24117c6 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -29,10 +29,12 @@
 
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/replica_management.pb.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
@@ -84,6 +86,9 @@ DEFINE_int32(heartbeat_inject_latency_before_heartbeat_ms, 0,
 TAG_FLAG(heartbeat_inject_latency_before_heartbeat_ms, runtime);
 TAG_FLAG(heartbeat_inject_latency_before_heartbeat_ms, unsafe);
 
+DECLARE_bool(raft_prepare_replacement_before_eviction);
+
+using kudu::master::MasterErrorPB;
 using kudu::master::MasterServiceProxy;
 using kudu::master::TabletReportPB;
 using kudu::pb_util::SecureDebugString;
@@ -145,7 +150,7 @@ class Heartbeater::Thread {
   Status ConnectToMaster();
   int GetMinimumHeartbeatMillis() const;
   int GetMillisUntilNextHeartbeat() const;
-  Status DoHeartbeat();
+  Status DoHeartbeat(MasterErrorPB* error = nullptr);
   Status SetupRegistration(ServerRegistrationPB* reg);
   void SetupCommonField(master::TSToMasterCommonPB* common);
   bool IsCurrentThread() const;
@@ -164,7 +169,7 @@ class Heartbeater::Thread {
   scoped_refptr<kudu::Thread> thread_;
 
   // Current RPC proxy to the leader master.
-  gscoped_ptr<master::MasterServiceProxy> proxy_;
+  gscoped_ptr<MasterServiceProxy> proxy_;
 
   // The most recent response from a heartbeat.
   master::TSHeartbeatResponsePB last_hb_response_;
@@ -374,7 +379,7 @@ int Heartbeater::Thread::GetMillisUntilNextHeartbeat() const {
   return FLAGS_heartbeat_interval_ms;
 }
 
-Status Heartbeater::Thread::DoHeartbeat() {
+Status Heartbeater::Thread::DoHeartbeat(MasterErrorPB* error) {
   if (PREDICT_FALSE(server_->fail_heartbeats_for_tests())) {
     return Status::IOError("failing all heartbeats for tests");
   }
@@ -401,12 +406,18 @@ Status Heartbeater::Thread::DoHeartbeat() {
     LOG(INFO) << "Registering TS with master...";
     RETURN_NOT_OK_PREPEND(SetupRegistration(req.mutable_registration()),
                           "Unable to set up registration");
+    // If registering, let the catalog manager know what replica replacement
+    // scheme the tablet server is running with.
+    auto* info = req.mutable_replica_management_info();
+    info->set_replacement_scheme(FLAGS_raft_prepare_replacement_before_eviction
+        ? consensus::ReplicaManagementInfoPB::PREPARE_REPLACEMENT_BEFORE_EVICTION
+        : consensus::ReplicaManagementInfoPB::EVICT_FIRST);
   }
 
   // Check with the TS cert manager if it has a cert that needs signing.
-  // if so, send the CSR in the heartbeat for the master to sign.
+  // If so, send the CSR in the heartbeat for the master to sign.
   boost::optional<security::CertSignRequest> csr =
-    server_->mutable_tls_context()->GetCsrIfNecessary();
+      server_->mutable_tls_context()->GetCsrIfNecessary();
   if (csr != boost::none) {
     RETURN_NOT_OK(csr->ToString(req.mutable_csr_der(), security::DataFormat::DER));
     VLOG(1) << "Sending a CSR to the master in the next heartbeat";
@@ -444,7 +455,10 @@ Status Heartbeater::Thread::DoHeartbeat() {
   RETURN_NOT_OK_PREPEND(proxy_->TSHeartbeat(req, &resp, &rpc),
                         "Failed to send heartbeat to master");
   if (resp.has_error()) {
-    return StatusFromPB(resp.error().status());
+    if (error) {
+      error->Swap(resp.mutable_error());
+    }
+    return StatusFromPB(error ? error->status() : resp.error().status());
   }
 
   VLOG(2) << Substitute("Received heartbeat response from $0:\n$1",
@@ -533,10 +547,12 @@ void Heartbeater::Thread::RunThread() {
       }
     }
 
-    Status s = DoHeartbeat();
+    MasterErrorPB error;
+    const auto& s = DoHeartbeat(&error);
     if (!s.ok()) {
+      const auto& err_msg = s.ToString();
       LOG(WARNING) << Substitute("Failed to heartbeat to $0: $1",
-                                 master_address_.ToString(), s.ToString());
+                                 master_address_.ToString(), err_msg);
       consecutive_failed_heartbeats_++;
       // If we encountered a network error (e.g., connection
       // refused), try reconnecting.
@@ -544,6 +560,9 @@ void Heartbeater::Thread::RunThread() {
           consecutive_failed_heartbeats_ >= FLAGS_heartbeat_max_failures_before_backoff)
{
         proxy_.reset();
       }
+      if (error.has_code() && error.code() == MasterErrorPB::INCOMPATIBILITY) {
+        LOG(FATAL) << "master detected incompatibility: " << err_msg;
+      }
       continue;
     }
     consecutive_failed_heartbeats_ = 0;
@@ -554,7 +573,7 @@ bool Heartbeater::Thread::IsCurrentThread() const {
   return thread_.get() == kudu::Thread::current_thread();
 }
 
-void Heartbeater::Thread::MarkTabletReportAcknowledged(const master::TabletReportPB&
report) {
+void Heartbeater::Thread::MarkTabletReportAcknowledged(const TabletReportPB& report)
{
   std::lock_guard<simple_spinlock> l(dirty_tablets_lock_);
 
   int32_t acked_seq = report.sequence_number();


Mime
View raw message