From commits-return-5153-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Tue Jan 9 02:41:01 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 4B66C18076D for ; Tue, 9 Jan 2018 02:41:01 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3B3FF160C2C; Tue, 9 Jan 2018 01:41:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 05F6F160C3F for ; Tue, 9 Jan 2018 02:40:59 +0100 (CET) Received: (qmail 47333 invoked by uid 500); 9 Jan 2018 01:40:59 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 47314 invoked by uid 99); 9 Jan 2018 01:40:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jan 2018 01:40:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0567CDFFDA; Tue, 9 Jan 2018 01:40:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mpercy@apache.org To: commits@kudu.apache.org Date: Tue, 09 Jan 2018 01:40:58 -0000 Message-Id: In-Reply-To: <93597e45545f4523a0eee17a005d3521@git.apache.org> References: <93597e45545f4523a0eee17a005d3521@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] kudu git commit: [master/tserver] enforce re-replication scheme consistency [master/tserver] enforce re-replication scheme consistency Don't register a tablet server with master if they run with different tablet replica replacement schemes. Added an integration test to verify the desired behavior. Change-Id: I71c4c2e72bb2d62cec6de0f6d00b418377e8ae85 Reviewed-on: http://gerrit.cloudera.org:8080/8901 Tested-by: Kudu Jenkins Reviewed-by: Mike Percy Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1769eed5 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1769eed5 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1769eed5 Branch: refs/heads/master Commit: 1769eed53ee2c21a88a766cb72bf8c9ae622099d Parents: 6efc50d Author: Alexey Serbin Authored: Wed Dec 13 13:21:50 2017 -0800 Committer: Mike Percy Committed: Tue Jan 9 01:35:19 2018 +0000 ---------------------------------------------------------------------- src/kudu/consensus/CMakeLists.txt | 3 +- src/kudu/consensus/replica_management.proto | 41 ++++++++++++ .../raft_consensus_nonvoter-itest.cc | 66 +++++++++++++++++++- src/kudu/master/master.proto | 9 +++ src/kudu/master/master_service.cc | 41 +++++++++++- src/kudu/tserver/heartbeater.cc | 37 ++++++++--- 6 files changed, 182 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/1769eed5/src/kudu/consensus/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt index 03fad03..ef2f9a7 100644 --- a/src/kudu/consensus/CMakeLists.txt +++ b/src/kudu/consensus/CMakeLists.txt @@ -25,7 +25,8 @@ PROTOBUF_GENERATE_CPP( BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. PROTO_FILES metadata.proto - opid.proto) + opid.proto + replica_management.proto) set(METADATA_PROTO_LIBS kudu_common_proto fs_proto http://git-wip-us.apache.org/repos/asf/kudu/blob/1769eed5/src/kudu/consensus/replica_management.proto ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/replica_management.proto b/src/kudu/consensus/replica_management.proto new file mode 100644 index 0000000..b9a1ee1 --- /dev/null +++ b/src/kudu/consensus/replica_management.proto @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax = "proto2"; +package kudu.consensus; + +option java_package = "org.apache.kudu.consensus"; + +// Communicates replica management information between servers. +message ReplicaManagementInfoPB { + // Replica replacement schemes. + enum ReplacementScheme { + UNKNOWN = 999; + + // The leader replica evicts the failed replica first, and then the new + // voter replica is added. + EVICT_FIRST = 0; + + // Add a new non-voter replica, promote the replica to voter once it + // caught up with the leader, and only after that evict the failed replica. + PREPARE_REPLACEMENT_BEFORE_EVICTION = 1; + } + + // Using 'optional' instead of 'required' because at some point we may decide + // to obsolete this field. + optional ReplacementScheme replacement_scheme = 1; +} http://git-wip-us.apache.org/repos/asf/kudu/blob/1769eed5/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc index f0350dc..6726109 100644 --- a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc +++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc @@ -57,6 +57,7 @@ #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" +DECLARE_int32(heartbeat_interval_ms); DECLARE_int32(num_replicas); DECLARE_int32(num_tablet_servers); DECLARE_double(leader_failure_max_missed_heartbeat_periods); @@ -88,6 +89,7 @@ using kudu::itest::StartElection; using kudu::itest::TServerDetails; using kudu::itest::TabletServerMap; using kudu::itest::WAIT_FOR_LEADER; +using kudu::itest::WaitForNumTabletsOnTS; using kudu::itest::WaitForReplicasReportedToMaster; using kudu::master::ANY_REPLICA; using kudu::master::GetTableLocationsResponsePB; @@ -1242,9 +1244,9 @@ TEST_F(RaftConsensusNonVoterITest, CatalogManagerAddsNonVoter) { const MonoDelta kTimeout = MonoDelta::FromSeconds(6 * kReplicaUnavailableSec); const int kReplicasNum = 3; FLAGS_num_replicas = kReplicasNum; - // Need one extra tserver after the tserver with on of the replicas stopped. - // Otherwise, the catalog manager would not be able to spawn a new non-voter - // replacement replica. + // Will need one extra tserver after the tserver with existing voter replica + // is stopped. Otherwise, the catalog manager would not be able to spawn + // a new non-voter replacement replica. FLAGS_num_tablet_servers = kReplicasNum + 1; const vector kMasterFlags = { // The scenario runs with the new replica management scheme. @@ -1828,5 +1830,63 @@ TEST_F(RaftConsensusNonVoterITest, NonVoterReplicasInConsensusQueue) { NO_FATALS(cluster_->AssertNoCrashes()); } +// This test runs master and tablet server with different replica replacement +// schemes and makes sure that the tablet server is not registered with the +// master in such case. +// Also, it makes sure the tablet server crashes to signal the misconfiguration. +class IncompatibleReplicaReplacementSchemesITest : + public RaftConsensusNonVoterITest, + public ::testing::WithParamInterface { +}; +INSTANTIATE_TEST_CASE_P(, IncompatibleReplicaReplacementSchemesITest, + ::testing::Bool()); +TEST_P(IncompatibleReplicaReplacementSchemesITest, MasterAndTserverMisconfig) { + FLAGS_num_tablet_servers = 1; + FLAGS_num_replicas = 1; + const MonoDelta kTimeout = MonoDelta::FromSeconds(60); + const int64_t heartbeat_interval_ms = 500; + + const bool is_3_4_3 = GetParam(); + + // The easiest way to have everything setup is to start the cluster with + // compatible parameters. + const vector kMasterFlags = { + Substitute("--raft_prepare_replacement_before_eviction=$0", is_3_4_3), + }; + const vector kTsFlags = { + Substitute("--heartbeat_interval_ms=$0", heartbeat_interval_ms), + Substitute("--raft_prepare_replacement_before_eviction=$0", is_3_4_3), + }; + + NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); + + vector tservers; + ASSERT_OK(itest::ListTabletServers(cluster_->master_proxy(), kTimeout, &tservers)); + ASSERT_EQ(1, tservers.size()); + + ASSERT_EQ(1, cluster_->num_tablet_servers()); + auto* ts = cluster_->tablet_server(0); + ASSERT_NE(nullptr, ts); + ts->Shutdown(); + + auto* master = cluster_->master(0); + master->Shutdown(); + ASSERT_OK(master->Restart()); + + // Update corresponding flags to induce a misconfiguration between the master + // and the tablet server. + ts->mutable_flags()->push_back( + Substitute("--raft_prepare_replacement_before_eviction=$0", !is_3_4_3)); + ASSERT_OK(ts->Restart()); + + ASSERT_OK(ts->WaitForFatal(kTimeout)); + + SleepFor(MonoDelta::FromMilliseconds(heartbeat_interval_ms * 3)); + + // The tablet server should not be registered with the master. + ASSERT_OK(itest::ListTabletServers(cluster_->master_proxy(), kTimeout, &tservers)); + ASSERT_EQ(0, tservers.size()); +} + } // namespace tserver } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/1769eed5/src/kudu/master/master.proto ---------------------------------------------------------------------- diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto index 5342960..048489f 100644 --- a/src/kudu/master/master.proto +++ b/src/kudu/master/master.proto @@ -22,6 +22,7 @@ option java_package = "org.apache.kudu.master"; import "kudu/common/common.proto"; import "kudu/common/wire_protocol.proto"; import "kudu/consensus/metadata.proto"; +import "kudu/consensus/replica_management.proto"; import "kudu/rpc/rpc_header.proto"; import "kudu/security/token.proto"; import "kudu/tablet/metadata.proto"; @@ -73,6 +74,10 @@ message MasterErrorPB { // The number of replicas requested is illegal (eg non-positive). ILLEGAL_REPLICATION_FACTOR = 11; + + // The caller detected an application-level incompatibility with the callee. + // This might be caused by a misconfiguration, incompatible features, etc. + INCOMPATIBILITY = 12; } // The error code. @@ -283,6 +288,10 @@ message TSHeartbeatRequestPB { // The most recently known TSK sequence number. Allows the master to // selectively notify the tablet server of more recent TSKs. optional int64 latest_tsk_seq_num = 6; + + // Replica management parameters that the tablet server is running with. + // This field is set only if the registration field is present. + optional consensus.ReplicaManagementInfoPB replica_management_info = 7; } message TSHeartbeatResponsePB { http://git-wip-us.apache.org/repos/asf/kudu/blob/1769eed5/src/kudu/master/master_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc index 904b93c..6dc8965 100644 --- a/src/kudu/master/master_service.cc +++ b/src/kudu/master/master_service.cc @@ -24,11 +24,13 @@ #include #include +#include #include #include "kudu/common/common.pb.h" #include "kudu/common/wire_protocol.h" #include "kudu/common/wire_protocol.pb.h" +#include "kudu/consensus/replica_management.pb.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/master/catalog_manager.h" #include "kudu/master/master.h" @@ -45,9 +47,12 @@ #include "kudu/util/flag_tags.h" #include "kudu/util/logging.h" #include "kudu/util/monotime.h" +#include "kudu/util/net/sockaddr.h" #include "kudu/util/pb_util.h" #include "kudu/util/status.h" +DECLARE_bool(raft_prepare_replacement_before_eviction); + DEFINE_int32(master_inject_latency_on_tablet_lookups_ms, 0, "Number of milliseconds that the master will sleep before responding to " "requests for tablet locations."); @@ -67,6 +72,7 @@ DEFINE_bool(master_non_leader_masters_propagate_tsk, false, TAG_FLAG(master_non_leader_masters_propagate_tsk, hidden); using google::protobuf::Message; +using kudu::consensus::ReplicaManagementInfoPB; using kudu::pb_util::SecureDebugString; using kudu::pb_util::SecureShortDebugString; using kudu::security::SignedTokenPB; @@ -146,13 +152,43 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req, // 3. Register or look up the tserver. shared_ptr ts_desc; if (req->has_registration()) { + const auto scheme = FLAGS_raft_prepare_replacement_before_eviction + ? ReplicaManagementInfoPB::PREPARE_REPLACEMENT_BEFORE_EVICTION + : ReplicaManagementInfoPB::EVICT_FIRST; + const auto ts_scheme = req->has_replica_management_info() + ? req->replica_management_info().replacement_scheme() + : ReplicaManagementInfoPB::EVICT_FIRST; + // If the catalog manager is running with some different replica management + // scheme, report back an error and do not register the tablet server. + if (scheme != ts_scheme) { + const auto& ts_uuid = req->common().ts_instance().permanent_uuid(); + const auto& ts_addr = rpc->remote_address().ToString(); + Status s = Status::ConfigurationError( + Substitute("replica replacement scheme ($0) of the tablet server " + "$1 at $2 differs from the catalog manager's ($3); " + "they must be run with the same scheme (controlled " + "by the --raft_prepare_replacement_before_eviction flag)", + ReplicaManagementInfoPB::ReplacementScheme_Name(ts_scheme), + ts_uuid, ts_addr, + ReplicaManagementInfoPB::ReplacementScheme_Name(scheme))); + LOG(WARNING) << s.ToString(); + + auto* error = resp->mutable_error(); + StatusToPB(s, error->mutable_status()); + error->set_code(MasterErrorPB::INCOMPATIBILITY); + + // Yes, this is confusing: the RPC result is success, but the response + // contains an application-level error. + rpc->RespondSuccess(); + return; + } Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(), req->registration(), &ts_desc); if (!s.ok()) { LOG(WARNING) << Substitute("Unable to register tserver ($0): $1", rpc->requestor_string(), s.ToString()); - // TODO: add service-specific errors + // TODO(todd): add service-specific errors rpc->RespondFailure(s); return; } @@ -170,7 +206,8 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req, rpc->RespondSuccess(); return; - } else if (!s.ok()) { + } + if (!s.ok()) { LOG(WARNING) << Substitute("Unable to look up tserver for heartbeat " "request $0 from $1: $2", SecureDebugString(*req), rpc->requestor_string(), s.ToString()); http://git-wip-us.apache.org/repos/asf/kudu/blob/1769eed5/src/kudu/tserver/heartbeater.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc index 012f8df..24117c6 100644 --- a/src/kudu/tserver/heartbeater.cc +++ b/src/kudu/tserver/heartbeater.cc @@ -29,10 +29,12 @@ #include #include +#include #include #include "kudu/common/wire_protocol.h" #include "kudu/common/wire_protocol.pb.h" +#include "kudu/consensus/replica_management.pb.h" #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" @@ -84,6 +86,9 @@ DEFINE_int32(heartbeat_inject_latency_before_heartbeat_ms, 0, TAG_FLAG(heartbeat_inject_latency_before_heartbeat_ms, runtime); TAG_FLAG(heartbeat_inject_latency_before_heartbeat_ms, unsafe); +DECLARE_bool(raft_prepare_replacement_before_eviction); + +using kudu::master::MasterErrorPB; using kudu::master::MasterServiceProxy; using kudu::master::TabletReportPB; using kudu::pb_util::SecureDebugString; @@ -145,7 +150,7 @@ class Heartbeater::Thread { Status ConnectToMaster(); int GetMinimumHeartbeatMillis() const; int GetMillisUntilNextHeartbeat() const; - Status DoHeartbeat(); + Status DoHeartbeat(MasterErrorPB* error = nullptr); Status SetupRegistration(ServerRegistrationPB* reg); void SetupCommonField(master::TSToMasterCommonPB* common); bool IsCurrentThread() const; @@ -164,7 +169,7 @@ class Heartbeater::Thread { scoped_refptr thread_; // Current RPC proxy to the leader master. - gscoped_ptr proxy_; + gscoped_ptr proxy_; // The most recent response from a heartbeat. master::TSHeartbeatResponsePB last_hb_response_; @@ -374,7 +379,7 @@ int Heartbeater::Thread::GetMillisUntilNextHeartbeat() const { return FLAGS_heartbeat_interval_ms; } -Status Heartbeater::Thread::DoHeartbeat() { +Status Heartbeater::Thread::DoHeartbeat(MasterErrorPB* error) { if (PREDICT_FALSE(server_->fail_heartbeats_for_tests())) { return Status::IOError("failing all heartbeats for tests"); } @@ -401,12 +406,18 @@ Status Heartbeater::Thread::DoHeartbeat() { LOG(INFO) << "Registering TS with master..."; RETURN_NOT_OK_PREPEND(SetupRegistration(req.mutable_registration()), "Unable to set up registration"); + // If registering, let the catalog manager know what replica replacement + // scheme the tablet server is running with. + auto* info = req.mutable_replica_management_info(); + info->set_replacement_scheme(FLAGS_raft_prepare_replacement_before_eviction + ? consensus::ReplicaManagementInfoPB::PREPARE_REPLACEMENT_BEFORE_EVICTION + : consensus::ReplicaManagementInfoPB::EVICT_FIRST); } // Check with the TS cert manager if it has a cert that needs signing. - // if so, send the CSR in the heartbeat for the master to sign. + // If so, send the CSR in the heartbeat for the master to sign. boost::optional csr = - server_->mutable_tls_context()->GetCsrIfNecessary(); + server_->mutable_tls_context()->GetCsrIfNecessary(); if (csr != boost::none) { RETURN_NOT_OK(csr->ToString(req.mutable_csr_der(), security::DataFormat::DER)); VLOG(1) << "Sending a CSR to the master in the next heartbeat"; @@ -444,7 +455,10 @@ Status Heartbeater::Thread::DoHeartbeat() { RETURN_NOT_OK_PREPEND(proxy_->TSHeartbeat(req, &resp, &rpc), "Failed to send heartbeat to master"); if (resp.has_error()) { - return StatusFromPB(resp.error().status()); + if (error) { + error->Swap(resp.mutable_error()); + } + return StatusFromPB(error ? error->status() : resp.error().status()); } VLOG(2) << Substitute("Received heartbeat response from $0:\n$1", @@ -533,10 +547,12 @@ void Heartbeater::Thread::RunThread() { } } - Status s = DoHeartbeat(); + MasterErrorPB error; + const auto& s = DoHeartbeat(&error); if (!s.ok()) { + const auto& err_msg = s.ToString(); LOG(WARNING) << Substitute("Failed to heartbeat to $0: $1", - master_address_.ToString(), s.ToString()); + master_address_.ToString(), err_msg); consecutive_failed_heartbeats_++; // If we encountered a network error (e.g., connection // refused), try reconnecting. @@ -544,6 +560,9 @@ void Heartbeater::Thread::RunThread() { consecutive_failed_heartbeats_ >= FLAGS_heartbeat_max_failures_before_backoff) { proxy_.reset(); } + if (error.has_code() && error.code() == MasterErrorPB::INCOMPATIBILITY) { + LOG(FATAL) << "master detected incompatibility: " << err_msg; + } continue; } consecutive_failed_heartbeats_ = 0; @@ -554,7 +573,7 @@ bool Heartbeater::Thread::IsCurrentThread() const { return thread_.get() == kudu::Thread::current_thread(); } -void Heartbeater::Thread::MarkTabletReportAcknowledged(const master::TabletReportPB& report) { +void Heartbeater::Thread::MarkTabletReportAcknowledged(const TabletReportPB& report) { std::lock_guard l(dirty_tablets_lock_); int32_t acked_seq = report.sequence_number();