Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 96D28200C88 for ; Fri, 2 Jun 2017 20:47:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 946EC160BD2; Fri, 2 Jun 2017 18:47:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 42513160BBA for ; Fri, 2 Jun 2017 20:47:10 +0200 (CEST) Received: (qmail 39219 invoked by uid 500); 2 Jun 2017 18:47:09 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 39210 invoked by uid 99); 2 Jun 2017 18:47:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Jun 2017 18:47:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 36776DFDC8; Fri, 2 Jun 2017 18:47:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mpercy@apache.org To: commits@kudu.apache.org Message-Id: <2d313ba15b2046cc91d5c92576c40644@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kudu git commit: consensus: Remove Consensus interface Date: Fri, 2 Jun 2017 18:47:09 +0000 (UTC) archived-at: Fri, 02 Jun 2017 18:47:12 -0000 Repository: kudu Updated Branches: refs/heads/master 037f72b37 -> e3b7d4dc1 consensus: Remove Consensus interface We only have one Consensus implementation now, and have no plans for additional implementations in the future. So we can remove this interface. Change-Id: I21b976cb92619083e1f1766b13634b841b554c8c Reviewed-on: http://gerrit.cloudera.org:8080/7040 Tested-by: Kudu Jenkins Reviewed-by: Mike Percy Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e3b7d4dc Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e3b7d4dc Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e3b7d4dc Branch: refs/heads/master Commit: e3b7d4dc1a6be020f3f126a66192ca1451e95e6f Parents: 037f72b Author: Mike Percy Authored: Wed May 31 21:21:22 2017 -0700 Committer: Mike Percy Committed: Fri Jun 2 18:46:53 2017 +0000 ---------------------------------------------------------------------- src/kudu/consensus/CMakeLists.txt | 1 - src/kudu/consensus/consensus-test-util.h | 5 +- src/kudu/consensus/consensus.cc | 81 ---- src/kudu/consensus/consensus.h | 430 ------------------- src/kudu/consensus/leader_election.h | 2 +- src/kudu/consensus/pending_rounds.cc | 1 + src/kudu/consensus/pending_rounds.h | 8 +- src/kudu/consensus/raft_consensus.cc | 71 ++- src/kudu/consensus/raft_consensus.h | 357 +++++++++++++-- .../consensus/raft_consensus_quorum-test.cc | 4 +- .../ts_tablet_manager-itest.cc | 4 +- src/kudu/master/catalog_manager.cc | 12 +- src/kudu/master/sys_catalog.cc | 2 +- src/kudu/tablet/tablet_replica.cc | 4 +- src/kudu/tablet/tablet_replica.h | 10 +- src/kudu/tablet/transactions/transaction.h | 4 +- .../tablet/transactions/transaction_driver.cc | 9 +- .../tablet/transactions/transaction_driver.h | 14 +- .../tablet/transactions/write_transaction.h | 4 - src/kudu/tserver/tablet_copy_source_session.cc | 4 +- src/kudu/tserver/tablet_service.cc | 30 +- src/kudu/tserver/ts_tablet_manager.cc | 8 +- src/kudu/tserver/tserver-path-handlers.cc | 4 +- 23 files changed, 448 insertions(+), 621 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt index e93ea94..ec53919 100644 --- a/src/kudu/consensus/CMakeLists.txt +++ b/src/kudu/consensus/CMakeLists.txt @@ -96,7 +96,6 @@ target_link_libraries(log consensus_metadata_proto) set(CONSENSUS_SRCS - consensus.cc consensus_meta.cc consensus_peers.cc consensus_queue.cc http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/consensus-test-util.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h index 89707aa..380d82b 100644 --- a/src/kudu/consensus/consensus-test-util.h +++ b/src/kudu/consensus/consensus-test-util.h @@ -27,7 +27,6 @@ #include "kudu/common/timestamp.h" #include "kudu/common/wire_protocol.h" -#include "kudu/consensus/consensus.h" #include "kudu/consensus/consensus_peers.h" #include "kudu/consensus/consensus_queue.h" #include "kudu/consensus/log.h" @@ -643,7 +642,7 @@ class TestTransactionFactory : public ReplicaTransactionFactory { CHECK_OK(ThreadPoolBuilder("test-txn-factory").set_max_threads(1).Build(&pool_)); } - void SetConsensus(Consensus* consensus) { + void SetConsensus(RaftConsensus* consensus) { consensus_ = consensus; } @@ -673,7 +672,7 @@ class TestTransactionFactory : public ReplicaTransactionFactory { private: gscoped_ptr pool_; - Consensus* consensus_; + RaftConsensus* consensus_; Log* log_; }; http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/consensus.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus.cc b/src/kudu/consensus/consensus.cc deleted file mode 100644 index 0553b5c..0000000 --- a/src/kudu/consensus/consensus.cc +++ /dev/null @@ -1,81 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "kudu/consensus/consensus.h" - -#include - -#include "kudu/consensus/log_util.h" -#include "kudu/consensus/opid_util.h" -#include "kudu/gutil/stl_util.h" -#include "kudu/gutil/strings/substitute.h" - -namespace kudu { -namespace consensus { - -using std::shared_ptr; -using strings::Substitute; - -ConsensusBootstrapInfo::ConsensusBootstrapInfo() - : last_id(MinimumOpId()), - last_committed_id(MinimumOpId()) { -} - -ConsensusBootstrapInfo::~ConsensusBootstrapInfo() { - STLDeleteElements(&orphaned_replicates); -} - -ConsensusRound::ConsensusRound(Consensus* consensus, - gscoped_ptr replicate_msg, - ConsensusReplicatedCallback replicated_cb) - : consensus_(consensus), - replicate_msg_(new RefCountedReplicate(replicate_msg.release())), - replicated_cb_(std::move(replicated_cb)), - bound_term_(-1) {} - -ConsensusRound::ConsensusRound(Consensus* consensus, - const ReplicateRefPtr& replicate_msg) - : consensus_(consensus), - replicate_msg_(replicate_msg), - bound_term_(-1) { - DCHECK(replicate_msg_); -} - -void ConsensusRound::NotifyReplicationFinished(const Status& status) { - if (PREDICT_FALSE(replicated_cb_.is_null())) return; - replicated_cb_.Run(status); -} - -Status ConsensusRound::CheckBoundTerm(int64_t current_term) const { - if (PREDICT_FALSE(bound_term_ != -1 && - bound_term_ != current_term)) { - return Status::Aborted( - strings::Substitute( - "Transaction submitted in term $0 cannot be replicated in term $1", - bound_term_, current_term)); - } - return Status::OK(); -} - -scoped_refptr Consensus::NewRound( - gscoped_ptr replicate_msg, - const ConsensusReplicatedCallback& replicated_cb) { - return make_scoped_refptr(new ConsensusRound(this, std::move(replicate_msg), replicated_cb)); -} - -} // namespace consensus -} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/consensus.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus.h b/src/kudu/consensus/consensus.h deleted file mode 100644 index 59910be..0000000 --- a/src/kudu/consensus/consensus.h +++ /dev/null @@ -1,430 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -#ifndef KUDO_QUORUM_CONSENSUS_H_ -#define KUDO_QUORUM_CONSENSUS_H_ - -#include -#include -#include -#include -#include - -#include "kudu/consensus/consensus.pb.h" -#include "kudu/consensus/ref_counted_replicate.h" -#include "kudu/gutil/callback.h" -#include "kudu/gutil/gscoped_ptr.h" -#include "kudu/gutil/ref_counted.h" -#include "kudu/util/status.h" -#include "kudu/util/status_callback.h" - -namespace kudu { -class MonoDelta; - -namespace log { -class Log; -struct RetentionIndexes; -} - -namespace master { -class SysCatalogTable; -} - -namespace server { -class Clock; -} - -namespace tablet { -class TabletReplica; -} - -namespace tserver { -class TabletServerErrorPB; -} - -namespace consensus { - -class ConsensusCommitContinuation; -class ConsensusRound; -class ReplicaTransactionFactory; -class TimeManager; - -typedef int64_t ConsensusTerm; - -typedef StatusCallback ConsensusReplicatedCallback; - -struct ConsensusOptions { - std::string tablet_id; -}; - -// After completing bootstrap, some of the results need to be plumbed through -// into the consensus implementation. -struct ConsensusBootstrapInfo { - ConsensusBootstrapInfo(); - ~ConsensusBootstrapInfo(); - - // The id of the last operation in the log - OpId last_id; - - // The id of the last committed operation in the log. - OpId last_committed_id; - - // REPLICATE messages which were in the log with no accompanying - // COMMIT. These need to be passed along to consensus init in order - // to potentially commit them. - // - // These are owned by the ConsensusBootstrapInfo instance. - std::vector orphaned_replicates; - - private: - DISALLOW_COPY_AND_ASSIGN(ConsensusBootstrapInfo); -}; - -// The external interface for a consensus peer. -// -// Note: Even though Consensus points to Log, it needs to be destroyed -// after it. See Log class header comment for the reason why. On the other -// hand Consensus must be quiesced before closing the log, otherwise it -// will try to write to a destroyed/closed log. -// -// The order of these operations on shutdown must therefore be: -// 1 - quiesce Consensus -// 2 - close/destroy Log -// 3 - destroy Consensus -class Consensus : public RefCountedThreadSafe { - public: - Consensus() {} - - // Starts running the consensus algorithm. - virtual Status Start(const ConsensusBootstrapInfo& info) = 0; - - // Returns true if consensus is running. - virtual bool IsRunning() const = 0; - - // Emulates a leader election by simply making this peer leader. - virtual Status EmulateElection() = 0; - - // Modes for StartElection(). - enum ElectionMode { - // A normal leader election. Peers will not vote for this node - // if they believe that a leader is alive. - NORMAL_ELECTION, - - // A "pre-election". Peers will vote as they would for a normal - // election, except that the votes will not be "binding". In other - // words, they will not durably record their vote. - PRE_ELECTION, - - // In this mode, peers will vote for this candidate even if they - // think a leader is alive. This can be used for a faster hand-off - // between a leader and one of its replicas. - ELECT_EVEN_IF_LEADER_IS_ALIVE - }; - - // Reasons for StartElection(). - enum ElectionReason { - // The election is being called because the Raft configuration has only - // a single node and has just started up. - INITIAL_SINGLE_NODE_ELECTION, - - // The election is being called because the timeout expired. In other - // words, the previous leader probably failed (or there was no leader - // in this term) - ELECTION_TIMEOUT_EXPIRED, - - // The election is being started because of an explicit external request. - EXTERNAL_REQUEST - }; - - // Triggers a leader election. - virtual Status StartElection(ElectionMode mode, ElectionReason reason) = 0; - - // Wait until the node has LEADER role. - // Returns Status::TimedOut if the role is not LEADER within 'timeout'. - virtual Status WaitUntilLeaderForTests(const MonoDelta& timeout) = 0; - - // Implement a LeaderStepDown() request. - virtual Status StepDown(LeaderStepDownResponsePB* resp) { - return Status::NotSupported("Not implemented."); - } - - // Creates a new ConsensusRound, the entity that owns all the data - // structures required for a consensus round, such as the ReplicateMsg - // (and later on the CommitMsg). ConsensusRound will also point to and - // increase the reference count for the provided callbacks. - scoped_refptr NewRound( - gscoped_ptr replicate_msg, - const ConsensusReplicatedCallback& replicated_cb); - - // Called by a Leader to replicate an entry to the state machine. - // - // From the leader instance perspective execution proceeds as follows: - // - // Leader RaftConfig - // + + - // 1) Req->| Replicate() | - // | | - // 2) +-------------replicate-------------->| - // |<---------------ACK------------------+ - // | | - // 3) +--+ | - // <----+ round.NotifyReplicationFinished()| - // | | - // 3a) | +------ update commitIndex ------->| - // | | - // - // 1) Caller calls Replicate(), method returns immediately to the caller and - // runs asynchronously. - // - // 2) Leader replicates the entry to the peers using the consensus - // algorithm, proceeds as soon as a majority of voters acknowledges the - // entry. - // - // 3) Leader defers to the caller by calling ConsensusRound::NotifyReplicationFinished, - // which calls the ConsensusReplicatedCallback. - // - // 3a) The leader asynchronously notifies other peers of the new - // commit index, which tells them to apply the operation. - // - // This method can only be called on the leader, i.e. role() == LEADER - virtual Status Replicate(const scoped_refptr& round) = 0; - - // Ensures that the consensus implementation is currently acting as LEADER, - // and thus is allowed to submit operations to be prepared before they are - // replicated. To avoid a time-of-check-to-time-of-use (TOCTOU) race, the - // implementation also stores the current term inside the round's "bound_term" - // member. When we eventually are about to replicate the transaction, we verify - // that the term has not changed in the meantime. - virtual Status CheckLeadershipAndBindTerm(const scoped_refptr& round) { - return Status::OK(); - } - - // Messages sent from LEADER to FOLLOWERS and LEARNERS to update their - // state machines. This is equivalent to "AppendEntries()" in Raft - // terminology. - // - // ConsensusRequestPB contains a sequence of 0 or more operations to apply - // on the replica. If there are 0 operations the request is considered - // 'status-only' i.e. the leader is communicating with the follower only - // in order to pass back and forth information on watermarks (eg committed - // operation ID, replicated op id, etc). - // - // If the sequence contains 1 or more operations they will be replicated - // in the same order as the leader, and submitted for asynchronous Prepare - // in the same order. - // - // The leader also provides information on the index of the latest - // operation considered committed by consensus. The replica uses this - // information to update the state of any pending (previously replicated/prepared) - // transactions. - // - // Returns Status::OK if the response has been filled (regardless of accepting - // or rejecting the specific request). Returns non-OK Status if a specific - // error response could not be formed, which will result in the service - // returning an UNKNOWN_ERROR RPC error code to the caller and including the - // stringified Status message. - virtual Status Update(const ConsensusRequestPB* request, - ConsensusResponsePB* response) = 0; - - // Messages sent from CANDIDATEs to voting peers to request their vote - // in leader election. - virtual Status RequestVote(const VoteRequestPB* request, - VoteResponsePB* response) = 0; - - // Implement a ChangeConfig() request. - virtual Status ChangeConfig(const ChangeConfigRequestPB& req, - const StatusCallback& client_cb, - boost::optional* error) { - return Status::NotSupported("Not implemented."); - } - - virtual Status UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req, - tserver::TabletServerErrorPB::Code* error) = 0; - - // Returns the current Raft role of this instance. - virtual RaftPeerPB::Role role() const = 0; - - // Returns the uuid of this peer. - virtual const std::string& peer_uuid() const = 0; - - // Returns the id of the tablet whose updates this consensus instance helps coordinate. - virtual const std::string& tablet_id() const = 0; - - virtual scoped_refptr time_manager() const = 0; - - // Returns a copy of the state of the consensus system. - virtual ConsensusStatePB ConsensusState() const = 0; - - // Returns a copy of the current committed Raft configuration. - virtual RaftConfigPB CommittedConfig() const = 0; - - virtual void DumpStatusHtml(std::ostream& out) const = 0; - - // Stops running the consensus algorithm. - virtual void Shutdown() = 0; - - // Returns the last OpId (either received or committed, depending on the - // 'type' argument) that the Consensus implementation knows about. - // Primarily used for testing purposes. - virtual Status GetLastOpId(OpIdType type, OpId* id) { - return Status::NotFound("Not implemented."); - } - - - // Return the log indexes which the consensus implementation would like to retain. - // - // The returned 'for_durability' index ensures that no logs are GCed before - // the operation is fully committed. The returned 'for_peers' index indicates - // the index of the farthest-behind peer so that the log will try to avoid - // GCing these before the peer has caught up. - virtual log::RetentionIndexes GetRetentionIndexes() = 0; - - protected: - friend class RefCountedThreadSafe; - friend class tablet::TabletReplica; - friend class master::SysCatalogTable; - - // This class is refcounted. - virtual ~Consensus() {} - - enum State { - kNotInitialized, - kInitializing, - kConfiguring, - kRunning, - }; - private: - DISALLOW_COPY_AND_ASSIGN(Consensus); -}; - -// Factory for replica transactions. -// An implementation of this factory must be registered prior to consensus -// start, and is used to create transactions when the consensus implementation receives -// messages from the leader. -// -// Replica transactions execute the following way: -// -// - When a ReplicateMsg is first received from the leader, the Consensus -// instance creates the ConsensusRound and calls StartReplicaTransaction(). -// This will trigger the Prepare(). At the same time replica consensus -// instance immediately stores the ReplicateMsg in the Log. Once the replicate -// message is stored in stable storage an ACK is sent to the leader (i.e. the -// replica Consensus instance does not wait for Prepare() to finish). -// -// - When the CommitMsg for a replicate is first received from the leader -// the replica waits for the corresponding Prepare() to finish (if it has -// not completed yet) and then proceeds to trigger the Apply(). -// -// - Once Apply() completes the ReplicaTransactionFactory is responsible for logging -// a CommitMsg to the log to ensure that the operation can be properly restored -// on a restart. -class ReplicaTransactionFactory { - public: - virtual Status StartReplicaTransaction(const scoped_refptr& context) = 0; - - virtual ~ReplicaTransactionFactory() {} -}; - -// Context for a consensus round on the LEADER side, typically created as an -// out-parameter of Consensus::Append. -// This class is ref-counted because we want to ensure it stays alive for the -// duration of the Transaction when it is associated with a Transaction, while -// we also want to ensure it has a proper lifecycle when a ConsensusRound is -// pushed that is not associated with a Tablet transaction. -class ConsensusRound : public RefCountedThreadSafe { - - public: - // Ctor used for leader transactions. Leader transactions can and must specify the - // callbacks prior to initiating the consensus round. - ConsensusRound(Consensus* consensus, gscoped_ptr replicate_msg, - ConsensusReplicatedCallback replicated_cb); - - // Ctor used for follower/learner transactions. These transactions do not use the - // replicate callback and the commit callback is set later, after the transaction - // is actually started. - ConsensusRound(Consensus* consensus, - const ReplicateRefPtr& replicate_msg); - - ReplicateMsg* replicate_msg() { - return replicate_msg_->get(); - } - - const ReplicateRefPtr& replicate_scoped_refptr() { - return replicate_msg_; - } - - // Returns the id of the (replicate) operation this context - // refers to. This is only set _after_ Consensus::Replicate(context). - OpId id() const { - return replicate_msg_->get()->id(); - } - - // Register a callback that is called by Consensus to notify that the round - // is considered either replicated, if 'status' is OK(), or that it has - // permanently failed to replicate if 'status' is anything else. If 'status' - // is OK() then the operation can be applied to the state machine, otherwise - // the operation should be aborted. - void SetConsensusReplicatedCallback(const ConsensusReplicatedCallback& replicated_cb) { - replicated_cb_ = replicated_cb; - } - - // If a continuation was set, notifies it that the round has been replicated. - void NotifyReplicationFinished(const Status& status); - - // Binds this round such that it may not be eventually executed in any term - // other than 'term'. - // See CheckBoundTerm(). - void BindToTerm(int64_t term) { - DCHECK_EQ(bound_term_, -1); - bound_term_ = term; - } - - // Check for a rare race in which an operation is submitted to the LEADER in some term, - // then before the operation is prepared, the replica loses its leadership, receives - // more operations as a FOLLOWER, and then regains its leadership. We detect this case - // by setting the ConsensusRound's "bound term" when it is first submitted to the - // PREPARE queue, and validate that the term is still the same when we have finished - // preparing it. See KUDU-597 for details. - // - // If this round has not been bound to any term, this is a no-op. - Status CheckBoundTerm(int64_t current_term) const; - - private: - friend class RaftConsensusQuorumTest; - friend class RefCountedThreadSafe; - - ~ConsensusRound() {} - - Consensus* consensus_; - // This round's replicate message. - ReplicateRefPtr replicate_msg_; - - // The continuation that will be called once the transaction is - // deemed committed/aborted by consensus. - ConsensusReplicatedCallback replicated_cb_; - - // The leader term that this round was submitted in. CheckBoundTerm() - // ensures that, when it is eventually replicated, the term has not - // changed in the meantime. - // - // Set to -1 if no term has been bound. - int64_t bound_term_; -}; - -} // namespace consensus -} // namespace kudu - -#endif /* CONSENSUS_H_ */ http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/leader_election.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/leader_election.h b/src/kudu/consensus/leader_election.h index d0129f4..d0f15ce 100644 --- a/src/kudu/consensus/leader_election.h +++ b/src/kudu/consensus/leader_election.h @@ -23,8 +23,8 @@ #include #include -#include "kudu/consensus/consensus.h" #include "kudu/consensus/consensus.pb.h" +#include "kudu/consensus/raft_consensus.h" #include "kudu/gutil/callback.h" #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/macros.h" http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/pending_rounds.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/pending_rounds.cc b/src/kudu/consensus/pending_rounds.cc index 121cb24..adbca50 100644 --- a/src/kudu/consensus/pending_rounds.cc +++ b/src/kudu/consensus/pending_rounds.cc @@ -17,6 +17,7 @@ #include "kudu/consensus/pending_rounds.h" +#include "kudu/consensus/raft_consensus.h" #include "kudu/consensus/time_manager.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/strings/substitute.h" http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/pending_rounds.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/pending_rounds.h b/src/kudu/consensus/pending_rounds.h index 02a8686..ccc9cd4 100644 --- a/src/kudu/consensus/pending_rounds.h +++ b/src/kudu/consensus/pending_rounds.h @@ -20,14 +20,16 @@ #include #include -#include "kudu/consensus/consensus.h" +#include "kudu/consensus/opid.pb.h" #include "kudu/consensus/opid_util.h" #include "kudu/gutil/macros.h" -#include "kudu/util/status.h" +#include "kudu/gutil/ref_counted.h" namespace kudu { -namespace consensus { +class Status; +namespace consensus { +class ConsensusRound; class TimeManager; // Tracks the pending consensus rounds being managed by a Raft replica (either leader http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/raft_consensus.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index b7f5cdc..d50bf9a 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -377,24 +377,24 @@ Status RaftConsensus::EmulateElection() { } namespace { -const char* ModeString(Consensus::ElectionMode mode) { +const char* ModeString(RaftConsensus::ElectionMode mode) { switch (mode) { - case Consensus::NORMAL_ELECTION: + case RaftConsensus::NORMAL_ELECTION: return "leader election"; - case Consensus::PRE_ELECTION: + case RaftConsensus::PRE_ELECTION: return "pre-election"; - case Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE: + case RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE: return "forced leader election"; } __builtin_unreachable(); // silence gcc warnings } -string ReasonString(Consensus::ElectionReason reason, StringPiece leader_uuid) { +string ReasonString(RaftConsensus::ElectionReason reason, StringPiece leader_uuid) { switch (reason) { - case Consensus::INITIAL_SINGLE_NODE_ELECTION: + case RaftConsensus::INITIAL_SINGLE_NODE_ELECTION: return "initial election of a single-replica configuration"; - case Consensus::EXTERNAL_REQUEST: + case RaftConsensus::EXTERNAL_REQUEST: return "received explicit request"; - case Consensus::ELECTION_TIMEOUT_EXPIRED: + case RaftConsensus::ELECTION_TIMEOUT_EXPIRED: if (leader_uuid.empty()) { return "no leader contacted us within the election timeout"; } @@ -522,6 +522,12 @@ Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) { return Status::OK(); } +scoped_refptr RaftConsensus::NewRound( + gscoped_ptr replicate_msg, + const ConsensusReplicatedCallback& replicated_cb) { + return make_scoped_refptr(new ConsensusRound(this, std::move(replicate_msg), replicated_cb)); +} + void RaftConsensus::ReportFailureDetected(const std::string& name, const Status& /*msg*/) { DCHECK_EQ(name, kTimerId); // Start an election. @@ -2676,5 +2682,54 @@ ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const { return cmeta_.get(); } +//////////////////////////////////////////////////////////////////////// +// ConsensusBootstrapInfo +//////////////////////////////////////////////////////////////////////// + +ConsensusBootstrapInfo::ConsensusBootstrapInfo() + : last_id(MinimumOpId()), + last_committed_id(MinimumOpId()) { +} + +ConsensusBootstrapInfo::~ConsensusBootstrapInfo() { + STLDeleteElements(&orphaned_replicates); +} + +//////////////////////////////////////////////////////////////////////// +// ConsensusRound +//////////////////////////////////////////////////////////////////////// + +ConsensusRound::ConsensusRound(RaftConsensus* consensus, + gscoped_ptr replicate_msg, + ConsensusReplicatedCallback replicated_cb) + : consensus_(consensus), + replicate_msg_(new RefCountedReplicate(replicate_msg.release())), + replicated_cb_(std::move(replicated_cb)), + bound_term_(-1) {} + +ConsensusRound::ConsensusRound(RaftConsensus* consensus, + const ReplicateRefPtr& replicate_msg) + : consensus_(consensus), + replicate_msg_(replicate_msg), + bound_term_(-1) { + DCHECK(replicate_msg_); +} + +void ConsensusRound::NotifyReplicationFinished(const Status& status) { + if (PREDICT_FALSE(replicated_cb_.is_null())) return; + replicated_cb_.Run(status); +} + +Status ConsensusRound::CheckBoundTerm(int64_t current_term) const { + if (PREDICT_FALSE(bound_term_ != -1 && + bound_term_ != current_term)) { + return Status::Aborted( + strings::Substitute( + "Transaction submitted in term $0 cannot be replicated in term $1", + bound_term_, current_term)); + } + return Status::OK(); +} + } // namespace consensus } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/raft_consensus.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h index f00a8c2..9ce29ec 100644 --- a/src/kudu/consensus/raft_consensus.h +++ b/src/kudu/consensus/raft_consensus.h @@ -24,41 +24,99 @@ #include #include -#include "kudu/consensus/consensus.h" #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/consensus_meta.h" #include "kudu/consensus/consensus_queue.h" +#include "kudu/consensus/peer_manager.h" #include "kudu/consensus/pending_rounds.h" #include "kudu/consensus/time_manager.h" +#include "kudu/gutil/ref_counted.h" #include "kudu/util/atomic.h" #include "kudu/util/failure_detector.h" +#include "kudu/util/status_callback.h" namespace kudu { class Counter; class FailureDetector; class HostPort; +class MonoDelta; +class Status; class ThreadPool; -namespace server { -class Clock; +namespace log { +class Log; +struct RetentionIndexes; } namespace rpc { class Messenger; } +namespace server { +class Clock; +} + +namespace tserver { +class TabletServerErrorPB; +} + namespace consensus { + class ConsensusMetadata; +class ConsensusRound; class Peer; class PeerProxyFactory; class PeerManager; +class ReplicaTransactionFactory; class TimeManager; + +struct ConsensusBootstrapInfo; struct ElectionResult; -class RaftConsensus : public Consensus, +struct ConsensusOptions { + std::string tablet_id; +}; + +typedef int64_t ConsensusTerm; +typedef StatusCallback ConsensusReplicatedCallback; + +class RaftConsensus : public RefCountedThreadSafe, public PeerMessageQueueObserver { public: + + // Modes for StartElection(). + enum ElectionMode { + // A normal leader election. Peers will not vote for this node + // if they believe that a leader is alive. + NORMAL_ELECTION, + + // A "pre-election". Peers will vote as they would for a normal + // election, except that the votes will not be "binding". In other + // words, they will not durably record their vote. + PRE_ELECTION, + + // In this mode, peers will vote for this candidate even if they + // think a leader is alive. This can be used for a faster hand-off + // between a leader and one of its replicas. + ELECT_EVEN_IF_LEADER_IS_ALIVE + }; + + // Reasons for StartElection(). + enum ElectionReason { + // The election is being called because the Raft configuration has only + // a single node and has just started up. + INITIAL_SINGLE_NODE_ELECTION, + + // The election is being called because the timeout expired. In other + // words, the previous leader probably failed (or there was no leader + // in this term) + ELECTION_TIMEOUT_EXPIRED, + + // The election is being started because of an explicit external request. + EXTERNAL_REQUEST + }; + static scoped_refptr Create( ConsensusOptions options, std::unique_ptr cmeta, @@ -85,64 +143,150 @@ class RaftConsensus : public Consensus, std::shared_ptr parent_mem_tracker, Callback mark_dirty_clbk); - virtual ~RaftConsensus(); + // Starts running the Raft consensus algorithm. + Status Start(const ConsensusBootstrapInfo& info); - Status Start(const ConsensusBootstrapInfo& info) override; - - bool IsRunning() const override; + // Returns true if RaftConsensus is running. + bool IsRunning() const; // Emulates an election by increasing the term number and asserting leadership // in the configuration by sending a NO_OP to other peers. // This is NOT safe to use in a distributed configuration with failure detection // enabled, as it could result in a split-brain scenario. - Status EmulateElection() override; + Status EmulateElection(); + + // Triggers a leader election. + Status StartElection(ElectionMode mode, ElectionReason reason); - Status StartElection(ElectionMode mode, ElectionReason reason) override; + // Wait until the node has LEADER role. + // Returns Status::TimedOut if the role is not LEADER within 'timeout'. + Status WaitUntilLeaderForTests(const MonoDelta& timeout); - Status WaitUntilLeaderForTests(const MonoDelta& timeout) override; + // Implement a LeaderStepDown() request. + Status StepDown(LeaderStepDownResponsePB* resp); - Status StepDown(LeaderStepDownResponsePB* resp) override; + // Creates a new ConsensusRound, the entity that owns all the data + // structures required for a consensus round, such as the ReplicateMsg + // (and later on the CommitMsg). ConsensusRound will also point to and + // increase the reference count for the provided callbacks. + scoped_refptr NewRound( + gscoped_ptr replicate_msg, + const ConsensusReplicatedCallback& replicated_cb); // Call StartElection(), log a warning if the call fails (usually due to // being shut down). void ReportFailureDetected(const std::string& name, const Status& msg); - Status Replicate(const scoped_refptr& round) override; - - Status CheckLeadershipAndBindTerm(const scoped_refptr& round) override; - + // Called by a Leader to replicate an entry to the state machine. + // + // From the leader instance perspective execution proceeds as follows: + // + // Leader RaftConfig + // + + + // 1) Req->| Replicate() | + // | | + // 2) +-------------replicate-------------->| + // |<---------------ACK------------------+ + // | | + // 3) +--+ | + // <----+ round.NotifyReplicationFinished()| + // | | + // 3a) | +------ update commitIndex ------->| + // | | + // + // 1) Caller calls Replicate(), method returns immediately to the caller and + // runs asynchronously. + // + // 2) Leader replicates the entry to the peers using the consensus + // algorithm, proceeds as soon as a majority of voters acknowledges the + // entry. + // + // 3) Leader defers to the caller by calling ConsensusRound::NotifyReplicationFinished, + // which calls the ConsensusReplicatedCallback. + // + // 3a) The leader asynchronously notifies other peers of the new + // commit index, which tells them to apply the operation. + // + // This method can only be called on the leader, i.e. role() == LEADER + Status Replicate(const scoped_refptr& round); + + // Ensures that the consensus implementation is currently acting as LEADER, + // and thus is allowed to submit operations to be prepared before they are + // replicated. To avoid a time-of-check-to-time-of-use (TOCTOU) race, the + // implementation also stores the current term inside the round's "bound_term" + // member. When we eventually are about to replicate the transaction, we verify + // that the term has not changed in the meantime. + Status CheckLeadershipAndBindTerm(const scoped_refptr& round); + + // Messages sent from LEADER to FOLLOWERS and LEARNERS to update their + // state machines. This is equivalent to "AppendEntries()" in Raft + // terminology. + // + // ConsensusRequestPB contains a sequence of 0 or more operations to apply + // on the replica. If there are 0 operations the request is considered + // 'status-only' i.e. the leader is communicating with the follower only + // in order to pass back and forth information on watermarks (eg committed + // operation ID, replicated op id, etc). + // + // If the sequence contains 1 or more operations they will be replicated + // in the same order as the leader, and submitted for asynchronous Prepare + // in the same order. + // + // The leader also provides information on the index of the latest + // operation considered committed by consensus. The replica uses this + // information to update the state of any pending (previously replicated/prepared) + // transactions. + // + // Returns Status::OK if the response has been filled (regardless of accepting + // or rejecting the specific request). Returns non-OK Status if a specific + // error response could not be formed, which will result in the service + // returning an UNKNOWN_ERROR RPC error code to the caller and including the + // stringified Status message. Status Update(const ConsensusRequestPB* request, - ConsensusResponsePB* response) override; + ConsensusResponsePB* response); + // Messages sent from CANDIDATEs to voting peers to request their vote + // in leader election. Status RequestVote(const VoteRequestPB* request, - VoteResponsePB* response) override; + VoteResponsePB* response); + // Implement a ChangeConfig() request. Status ChangeConfig(const ChangeConfigRequestPB& req, const StatusCallback& client_cb, - boost::optional* error_code) override; + boost::optional* error_code); + // Implement an UnsafeChangeConfig() request. Status UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req, - tserver::TabletServerErrorPB::Code* error_code) override; + tserver::TabletServerErrorPB::Code* error_code); - Status GetLastOpId(OpIdType type, OpId* id) override; + // Returns the last OpId (either received or committed, depending on the + // 'type' argument) that the Consensus implementation knows about. + // Primarily used for testing purposes. + Status GetLastOpId(OpIdType type, OpId* id); - RaftPeerPB::Role role() const override; + // Returns the current Raft role of this instance. + RaftPeerPB::Role role() const; + // Returns the uuid of this peer. // Thread-safe. - const std::string& peer_uuid() const override; + const std::string& peer_uuid() const; + // Returns the id of the tablet whose updates this consensus instance helps coordinate. // Thread-safe. - const std::string& tablet_id() const override; + const std::string& tablet_id() const; - scoped_refptr time_manager() const override { return time_manager_; } + scoped_refptr time_manager() const { return time_manager_; } - ConsensusStatePB ConsensusState() const override; + // Returns a copy of the state of the consensus system. + ConsensusStatePB ConsensusState() const; - RaftConfigPB CommittedConfig() const override; + // Returns a copy of the current committed Raft configuration. + RaftConfigPB CommittedConfig() const; - void DumpStatusHtml(std::ostream& out) const override; + void DumpStatusHtml(std::ostream& out) const; - void Shutdown() override; + // Stop running the Raft consensus algorithm. + void Shutdown(); // Makes this peer advance it's term (and step down if leader), for tests. Status AdvanceTermForTests(int64_t new_term); @@ -158,17 +302,24 @@ class RaftConsensus : public Consensus, // Updates the committed_index and triggers the Apply()s for whatever // transactions were pending. // This is idempotent. - void NotifyCommitIndex(int64_t commit_index) override; + void NotifyCommitIndex(int64_t commit_index); - void NotifyTermChange(int64_t term) override; + void NotifyTermChange(int64_t term); void NotifyFailedFollower(const std::string& uuid, int64_t term, - const std::string& reason) override; + const std::string& reason); - log::RetentionIndexes GetRetentionIndexes() override; + // Return the log indexes which the consensus implementation would like to retain. + // + // The returned 'for_durability' index ensures that no logs are GCed before + // the operation is fully committed. The returned 'for_peers' index indicates + // the index of the farthest-behind peer so that the log will try to avoid + // GCing these before the peer has caught up. + log::RetentionIndexes GetRetentionIndexes(); private: + friend class RefCountedThreadSafe; friend class RaftConsensusQuorumTest; FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind); FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind); @@ -224,6 +375,9 @@ class RaftConsensus : public Consensus, using LockGuard = std::lock_guard; using UniqueLock = std::unique_lock; + // Private because this class is refcounted. + ~RaftConsensus(); + // Returns string description for State enum value. static const char* State_Name(State state); @@ -636,5 +790,142 @@ class RaftConsensus : public Consensus, DISALLOW_COPY_AND_ASSIGN(RaftConsensus); }; +// After completing bootstrap, some of the results need to be plumbed through +// into the consensus implementation. +struct ConsensusBootstrapInfo { + ConsensusBootstrapInfo(); + ~ConsensusBootstrapInfo(); + + // The id of the last operation in the log + OpId last_id; + + // The id of the last committed operation in the log. + OpId last_committed_id; + + // REPLICATE messages which were in the log with no accompanying + // COMMIT. These need to be passed along to consensus init in order + // to potentially commit them. + // + // These are owned by the ConsensusBootstrapInfo instance. + std::vector orphaned_replicates; + + private: + DISALLOW_COPY_AND_ASSIGN(ConsensusBootstrapInfo); +}; + +// Factory for replica transactions. +// An implementation of this factory must be registered prior to consensus +// start, and is used to create transactions when the consensus implementation receives +// messages from the leader. +// +// Replica transactions execute the following way: +// +// - When a ReplicateMsg is first received from the leader, the RaftConsensus +// instance creates the ConsensusRound and calls StartReplicaTransaction(). +// This will trigger the Prepare(). At the same time replica consensus +// instance immediately stores the ReplicateMsg in the Log. Once the replicate +// message is stored in stable storage an ACK is sent to the leader (i.e. the +// replica RaftConsensus instance does not wait for Prepare() to finish). +// +// - When the CommitMsg for a replicate is first received from the leader +// the replica waits for the corresponding Prepare() to finish (if it has +// not completed yet) and then proceeds to trigger the Apply(). +// +// - Once Apply() completes the ReplicaTransactionFactory is responsible for logging +// a CommitMsg to the log to ensure that the operation can be properly restored +// on a restart. +class ReplicaTransactionFactory { + public: + virtual Status StartReplicaTransaction(const scoped_refptr& context) = 0; + + virtual ~ReplicaTransactionFactory() {} +}; + +// Context for a consensus round on the LEADER side, typically created as an +// out-parameter of RaftConsensus::Append(). +// This class is ref-counted because we want to ensure it stays alive for the +// duration of the Transaction when it is associated with a Transaction, while +// we also want to ensure it has a proper lifecycle when a ConsensusRound is +// pushed that is not associated with a Tablet transaction. +class ConsensusRound : public RefCountedThreadSafe { + + public: + // Ctor used for leader transactions. Leader transactions can and must specify the + // callbacks prior to initiating the consensus round. + ConsensusRound(RaftConsensus* consensus, gscoped_ptr replicate_msg, + ConsensusReplicatedCallback replicated_cb); + + // Ctor used for follower/learner transactions. These transactions do not use the + // replicate callback and the commit callback is set later, after the transaction + // is actually started. + ConsensusRound(RaftConsensus* consensus, + const ReplicateRefPtr& replicate_msg); + + ReplicateMsg* replicate_msg() { + return replicate_msg_->get(); + } + + const ReplicateRefPtr& replicate_scoped_refptr() { + return replicate_msg_; + } + + // Returns the id of the (replicate) operation this context + // refers to. This is only set _after_ RaftConsensus::Replicate(context). + OpId id() const { + return replicate_msg_->get()->id(); + } + + // Register a callback that is called by RaftConsensus to notify that the round + // is considered either replicated, if 'status' is OK(), or that it has + // permanently failed to replicate if 'status' is anything else. If 'status' + // is OK() then the operation can be applied to the state machine, otherwise + // the operation should be aborted. + void SetConsensusReplicatedCallback(const ConsensusReplicatedCallback& replicated_cb) { + replicated_cb_ = replicated_cb; + } + + // If a continuation was set, notifies it that the round has been replicated. + void NotifyReplicationFinished(const Status& status); + + // Binds this round such that it may not be eventually executed in any term + // other than 'term'. + // See CheckBoundTerm(). + void BindToTerm(int64_t term) { + DCHECK_EQ(bound_term_, -1); + bound_term_ = term; + } + + // Check for a rare race in which an operation is submitted to the LEADER in some term, + // then before the operation is prepared, the replica loses its leadership, receives + // more operations as a FOLLOWER, and then regains its leadership. We detect this case + // by setting the ConsensusRound's "bound term" when it is first submitted to the + // PREPARE queue, and validate that the term is still the same when we have finished + // preparing it. See KUDU-597 for details. + // + // If this round has not been bound to any term, this is a no-op. + Status CheckBoundTerm(int64_t current_term) const; + + private: + friend class RefCountedThreadSafe; + friend class RaftConsensusQuorumTest; + + ~ConsensusRound() {} + + RaftConsensus* consensus_; + // This round's replicate message. + ReplicateRefPtr replicate_msg_; + + // The continuation that will be called once the transaction is + // deemed committed/aborted by consensus. + ConsensusReplicatedCallback replicated_cb_; + + // The leader term that this round was submitted in. CheckBoundTerm() + // ensures that, when it is eventually replicated, the term has not + // changed in the meantime. + // + // Set to -1 if no term has been bound. + int64_t bound_term_; +}; + } // namespace consensus } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/raft_consensus_quorum-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc index a0933c8..d45890b 100644 --- a/src/kudu/consensus/raft_consensus_quorum-test.cc +++ b/src/kudu/consensus/raft_consensus_quorum-test.cc @@ -890,8 +890,8 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) { // non-shutdown peer in the list become leader. int flush_count_before = new_leader->consensus_metadata_for_tests()->flush_count_for_tests(); LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1); - ASSERT_OK(new_leader->StartElection(Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, - Consensus::EXTERNAL_REQUEST)); + ASSERT_OK(new_leader->StartElection(RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, + RaftConsensus::EXTERNAL_REQUEST)); WaitUntilLeaderForTests(new_leader.get()); LOG(INFO) << "Election won"; int flush_count_after = new_leader->consensus_metadata_for_tests()->flush_count_for_tests(); http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/integration-tests/ts_tablet_manager-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc index 9d5c6a5..547fef0 100644 --- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc +++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc @@ -134,7 +134,7 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) { ASSERT_OK(CreateTabletServerMap(master_proxy, client_messenger_, &ts_map)); ValueDeleter deleter(&ts_map); - // Collect the TabletReplicas so we get direct access to Consensus. + // Collect the TabletReplicas so we get direct access to RaftConsensus. vector > tablet_replicas; for (int replica = 0; replica < kNumReplicas; replica++) { MiniTabletServer* ts = cluster_->mini_tablet_server(replica); @@ -157,7 +157,7 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) { SCOPED_TRACE(Substitute("Iter: $0", i)); int new_leader_idx = rand() % 2; LOG(INFO) << "Electing peer " << new_leader_idx << "..."; - consensus::Consensus* con = CHECK_NOTNULL(tablet_replicas[new_leader_idx]->consensus()); + consensus::RaftConsensus* con = CHECK_NOTNULL(tablet_replicas[new_leader_idx]->consensus()); ASSERT_OK(con->EmulateElection()); LOG(INFO) << "Waiting for servers to agree..."; ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(5), http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/master/catalog_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index c67b423..df7935e 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -222,10 +222,10 @@ namespace master { using base::subtle::NoBarrier_CompareAndSwap; using base::subtle::NoBarrier_Load; using cfile::TypeEncodingInfo; -using consensus::Consensus; using consensus::ConsensusServiceProxy; using consensus::ConsensusStatePB; using consensus::GetConsensusRole; +using consensus::RaftConsensus; using consensus::RaftPeerPB; using consensus::StartTabletCopyRequestPB; using consensus::kMinimumTerm; @@ -747,7 +747,7 @@ Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) { const string& uuid = master_->fs_manager()->uuid(); if (!cstate.has_leader_uuid() || cstate.leader_uuid() != uuid) { return Status::IllegalState( - Substitute("Node $0 not leader. Consensus state: $1", + Substitute("Node $0 not leader. Raft Consensus state: $1", uuid, SecureShortDebugString(cstate))); } @@ -925,7 +925,7 @@ void CatalogManager::PrepareForLeadershipTask() { // Hack to block this function until InitSysCatalogAsync() is finished. shared_lock l(lock_); } - const Consensus* consensus = sys_catalog_->tablet_replica()->consensus(); + const RaftConsensus* consensus = sys_catalog_->tablet_replica()->consensus(); const int64_t term_before_wait = consensus->ConsensusState().current_term(); { std::lock_guard l(state_lock_); @@ -967,7 +967,7 @@ void CatalogManager::PrepareForLeadershipTask() { // task. If the error is considered fatal, LOG(FATAL) is called. const auto check = [this]( std::function func, - const Consensus& consensus, + const RaftConsensus& consensus, int64_t start_term, const char* op_description) { @@ -1101,7 +1101,7 @@ bool CatalogManager::IsInitialized() const { } RaftPeerPB::Role CatalogManager::Role() const { - scoped_refptr consensus; + scoped_refptr consensus; { std::lock_guard l(state_lock_); if (state_ == kRunning) { @@ -4164,7 +4164,7 @@ CatalogManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock( const string& uuid = catalog_->master_->fs_manager()->uuid(); if (PREDICT_FALSE(!cstate.has_leader_uuid() || cstate.leader_uuid() != uuid)) { leader_status_ = Status::IllegalState( - Substitute("Not the leader. Local UUID: $0, Consensus state: $1", + Substitute("Not the leader. Local UUID: $0, Raft Consensus state: $1", uuid, SecureShortDebugString(cstate))); return; } http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/master/sys_catalog.cc ---------------------------------------------------------------------- diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc index c25aae7..bf7ae7a 100644 --- a/src/kudu/master/sys_catalog.cc +++ b/src/kudu/master/sys_catalog.cc @@ -276,7 +276,7 @@ Status SysCatalogTable::CreateDistributedConfig(const MasterOptions& options, void SysCatalogTable::SysCatalogStateChanged(const string& tablet_id, const string& reason) { CHECK_EQ(tablet_id, tablet_replica_->tablet_id()); - scoped_refptr consensus = tablet_replica_->shared_consensus(); + scoped_refptr consensus = tablet_replica_->shared_consensus(); if (!consensus) { LOG_WITH_PREFIX(WARNING) << "Received notification of tablet state change " << "but tablet no longer running. Tablet ID: " http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/tablet_replica.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc index 67c408b..61cb3bb 100644 --- a/src/kudu/tablet/tablet_replica.cc +++ b/src/kudu/tablet/tablet_replica.cc @@ -25,7 +25,6 @@ #include #include -#include "kudu/consensus/consensus.h" #include "kudu/consensus/consensus_meta.h" #include "kudu/consensus/log.h" #include "kudu/consensus/log_anchor_registry.h" @@ -84,7 +83,6 @@ METRIC_DEFINE_histogram(tablet, op_prepare_run_time, "Operation Prepare Run Time "locks.", 10000000, 2); -using consensus::Consensus; using consensus::ConsensusBootstrapInfo; using consensus::ConsensusMetadata; using consensus::ConsensusOptions; @@ -318,7 +316,7 @@ Status TabletReplica::WaitUntilConsensusRunning(const MonoDelta& timeout) { MonoTime now(MonoTime::Now()); MonoDelta elapsed(now - start); if (elapsed > timeout) { - return Status::TimedOut(Substitute("Consensus is not running after waiting for $0. State; $1", + return Status::TimedOut(Substitute("Raft Consensus is not running after waiting for $0: $1", elapsed.ToString(), TabletStatePB_Name(cached_state))); } SleepFor(MonoDelta::FromMilliseconds(1L << backoff_exp)); http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/tablet_replica.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h index 51cd387..15ec7dc 100644 --- a/src/kudu/tablet/tablet_replica.h +++ b/src/kudu/tablet/tablet_replica.h @@ -24,8 +24,8 @@ #include #include -#include "kudu/consensus/consensus.h" #include "kudu/consensus/log.h" +#include "kudu/consensus/raft_consensus.h" #include "kudu/consensus/time_manager.h" #include "kudu/gutil/callback.h" #include "kudu/gutil/ref_counted.h" @@ -73,7 +73,7 @@ class TabletReplica : public RefCountedThreadSafe, Callback mark_dirty_clbk); // Initializes the TabletReplica, namely creating the Log and initializing - // Consensus. + // RaftConsensus. Status Init(const std::shared_ptr& tablet, const scoped_refptr& clock, const std::shared_ptr& messenger, @@ -122,12 +122,12 @@ class TabletReplica : public RefCountedThreadSafe, virtual Status StartReplicaTransaction( const scoped_refptr& round) OVERRIDE; - consensus::Consensus* consensus() { + consensus::RaftConsensus* consensus() { std::lock_guard lock(lock_); return consensus_.get(); } - scoped_refptr shared_consensus() const { + scoped_refptr shared_consensus() const { std::lock_guard lock(lock_); return consensus_; } @@ -288,7 +288,7 @@ class TabletReplica : public RefCountedThreadSafe, scoped_refptr log_; std::shared_ptr tablet_; std::shared_ptr messenger_; - scoped_refptr consensus_; + scoped_refptr consensus_; simple_spinlock prepare_replicate_lock_; // Lock protecting state_, last_status_, as well as smart pointers to collaborating http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/transactions/transaction.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/transactions/transaction.h b/src/kudu/tablet/transactions/transaction.h index f0696a6..72db821 100644 --- a/src/kudu/tablet/transactions/transaction.h +++ b/src/kudu/tablet/transactions/transaction.h @@ -24,11 +24,11 @@ #include "kudu/common/timestamp.h" #include "kudu/common/wire_protocol.h" -#include "kudu/consensus/consensus.h" +#include "kudu/consensus/raft_consensus.h" #include "kudu/util/auto_release_pool.h" #include "kudu/util/locks.h" -#include "kudu/util/status.h" #include "kudu/util/memory/arena.h" +#include "kudu/util/status.h" namespace kudu { http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/transactions/transaction_driver.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc index cb6bf64..57f841a 100644 --- a/src/kudu/tablet/transactions/transaction_driver.cc +++ b/src/kudu/tablet/transactions/transaction_driver.cc @@ -19,7 +19,6 @@ #include -#include "kudu/consensus/consensus.h" #include "kudu/consensus/time_manager.h" #include "kudu/gutil/strings/strcat.h" #include "kudu/rpc/result_tracker.h" @@ -36,11 +35,9 @@ namespace kudu { namespace tablet { using consensus::CommitMsg; -using consensus::Consensus; -using consensus::ConsensusRound; -using consensus::ReplicateMsg; -using consensus::CommitMsg; using consensus::DriverType; +using consensus::RaftConsensus; +using consensus::ReplicateMsg; using log::Log; using rpc::RequestIdPB; using rpc::ResultTracker; @@ -83,7 +80,7 @@ class FollowerTransactionCompletionCallback : public TransactionCompletionCallba //////////////////////////////////////////////////////////// TransactionDriver::TransactionDriver(TransactionTracker *txn_tracker, - Consensus* consensus, + RaftConsensus* consensus, Log* log, ThreadPool* prepare_pool, ThreadPool* apply_pool, http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/transactions/transaction_driver.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/transactions/transaction_driver.h b/src/kudu/tablet/transactions/transaction_driver.h index 044edda..e0a987c 100644 --- a/src/kudu/tablet/transactions/transaction_driver.h +++ b/src/kudu/tablet/transactions/transaction_driver.h @@ -21,7 +21,7 @@ #include #include -#include "kudu/consensus/consensus.h" +#include "kudu/consensus/raft_consensus.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/walltime.h" #include "kudu/server/clock.h" @@ -69,7 +69,7 @@ class TransactionTracker; // follower and ReplicationFinished() has already been called, then we can move // on to ApplyAsync(). // -// 4 - The Consensus implementation calls ReplicationFinished() +// 4 - RaftConsensus calls ReplicationFinished() // // This is triggered by consensus when the commit index moves past our own // OpId. On followers, this can happen before Prepare() finishes, and thus @@ -220,7 +220,7 @@ class TransactionDriver : public RefCountedThreadSafe { // Construct TransactionDriver. TransactionDriver does not take ownership // of any of the objects pointed to in the constructor's arguments. TransactionDriver(TransactionTracker* txn_tracker, - consensus::Consensus* consensus, + consensus::RaftConsensus* consensus, log::Log* log, ThreadPool* prepare_pool, ThreadPool* apply_pool, @@ -247,7 +247,7 @@ class TransactionDriver : public RefCountedThreadSafe { // at the next synchronization point. void Abort(const Status& status); - // Callback from Consensus when replication is complete, and thus the operation + // Callback from RaftConsensus when replication is complete, and thus the operation // is considered "committed" from the consensus perspective (ie it will be // applied on every node, and not ever truncated from the state machine history). // If status is anything different from OK() we don't proceed with the apply. @@ -307,7 +307,7 @@ class TransactionDriver : public RefCountedThreadSafe { // Submits ApplyTask to the apply pool. Status ApplyAsync(); - // Calls Transaction::Apply() followed by Consensus::Commit() with the + // Calls Transaction::Apply() followed by RaftConsensus::Commit() with the // results from the Apply(). void ApplyTask(); @@ -343,7 +343,7 @@ class TransactionDriver : public RefCountedThreadSafe { void RegisterFollowerTransactionOnResultTracker(); TransactionTracker* const txn_tracker_; - consensus::Consensus* const consensus_; + consensus::RaftConsensus* const consensus_; log::Log* const log_; ThreadPool* const prepare_pool_; ThreadPool* const apply_pool_; @@ -355,7 +355,7 @@ class TransactionDriver : public RefCountedThreadSafe { mutable simple_spinlock lock_; // A copy of the transaction's OpId, set when the transaction first - // receives one from Consensus and uninitialized until then. + // receives one from RaftConsensus and uninitialized until then. // TODO(todd): we have three separate copies of this now -- in TransactionState, // CommitMsg, and here... we should be able to consolidate! consensus::OpId op_id_copy_; http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/transactions/write_transaction.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/transactions/write_transaction.h b/src/kudu/tablet/transactions/write_transaction.h index f8ea48d..c9aa80f 100644 --- a/src/kudu/tablet/transactions/write_transaction.h +++ b/src/kudu/tablet/transactions/write_transaction.h @@ -36,10 +36,6 @@ struct DecodedRowOperation; class ConstContiguousRow; class RowwiseRowBlockPB; -namespace consensus { -class Consensus; -} - namespace tserver { class WriteRequestPB; class WriteResponsePB; http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tserver/tablet_copy_source_session.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc index 2926792..85faa49 100644 --- a/src/kudu/tserver/tablet_copy_source_session.cc +++ b/src/kudu/tserver/tablet_copy_source_session.cc @@ -120,12 +120,12 @@ Status TabletCopySourceSession::Init() { // We do this after snapshotting the log to avoid a scenario where the latest // entry in the log has a term higher than the term stored in the consensus // metadata, which will results in a CHECK failure on RaftConsensus init. - scoped_refptr consensus = tablet_replica_->shared_consensus(); + scoped_refptr consensus = tablet_replica_->shared_consensus(); if (!consensus) { tablet::TabletStatePB tablet_state = tablet_replica_->state(); return Status::IllegalState(Substitute( "Unable to initialize tablet copy session for tablet $0. " - "Consensus is not available. Tablet state: $1 ($2)", + "Raft Consensus is not available. Tablet state: $1 ($2)", tablet_id, tablet::TabletStatePB_Name(tablet_state), tablet_state)); } initial_cstate_ = consensus->ConsensusState(); http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tserver/tablet_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 82740ab..68fa88b 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -28,7 +28,7 @@ #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" #include "kudu/common/wire_protocol.h" -#include "kudu/consensus/consensus.h" +#include "kudu/consensus/raft_consensus.h" #include "kudu/consensus/time_manager.h" #include "kudu/gutil/bind.h" #include "kudu/gutil/casts.h" @@ -103,7 +103,6 @@ DECLARE_int32(tablet_history_max_age_sec); using google::protobuf::RepeatedPtrField; using kudu::consensus::ChangeConfigRequestPB; using kudu::consensus::ChangeConfigResponsePB; -using kudu::consensus::Consensus; using kudu::consensus::ConsensusRequestPB; using kudu::consensus::ConsensusResponsePB; using kudu::consensus::GetLastOpIdRequestPB; @@ -113,6 +112,7 @@ using kudu::consensus::LeaderStepDownRequestPB; using kudu::consensus::LeaderStepDownResponsePB; using kudu::consensus::UnsafeChangeConfigRequestPB; using kudu::consensus::UnsafeChangeConfigResponsePB; +using kudu::consensus::RaftConsensus; using kudu::consensus::RunLeaderElectionRequestPB; using kudu::consensus::RunLeaderElectionResponsePB; using kudu::consensus::StartTabletCopyRequestPB; @@ -214,10 +214,10 @@ template bool GetConsensusOrRespond(const scoped_refptr& replica, RespClass* resp, rpc::RpcContext* context, - scoped_refptr* consensus) { + scoped_refptr* consensus) { *consensus = replica->shared_consensus(); if (!*consensus) { - Status s = Status::ServiceUnavailable("Consensus unavailable. Tablet not running"); + Status s = Status::ServiceUnavailable("Raft Consensus unavailable. Tablet not running"); SetupErrorAndRespond(resp->mutable_error(), s, TabletServerErrorPB::TABLET_NOT_RUNNING, context); return false; @@ -855,8 +855,8 @@ void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req, replica->permanent_uuid(); - // Submit the update directly to the TabletReplica's Consensus instance. - scoped_refptr consensus; + // Submit the update directly to the TabletReplica's RaftConsensus instance. + scoped_refptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; Status s = consensus->Update(req, resp); if (PREDICT_FALSE(!s.ok())) { @@ -886,7 +886,7 @@ void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req, } // Submit the vote request directly to the consensus instance. - scoped_refptr consensus; + scoped_refptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; Status s = consensus->RequestVote(req, resp); if (PREDICT_FALSE(!s.ok())) { @@ -911,7 +911,7 @@ void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req, return; } - scoped_refptr consensus; + scoped_refptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; boost::optional error_code; Status s = consensus->ChangeConfig(*req, BindHandleResponse(req, resp, context), &error_code); @@ -935,7 +935,7 @@ void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB* return; } - scoped_refptr consensus; + scoped_refptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; TabletServerErrorPB::Code error_code; Status s = consensus->UnsafeChangeConfig(*req, &error_code); @@ -966,11 +966,11 @@ void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* r return; } - scoped_refptr consensus; + scoped_refptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; Status s = consensus->StartElection( - consensus::Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, - consensus::Consensus::EXTERNAL_REQUEST); + consensus::RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, + consensus::RaftConsensus::EXTERNAL_REQUEST); if (PREDICT_FALSE(!s.ok())) { SetupErrorAndRespond(resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, @@ -992,7 +992,7 @@ void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req, return; } - scoped_refptr consensus; + scoped_refptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; Status s = consensus->StepDown(resp); if (PREDICT_FALSE(!s.ok())) { @@ -1022,7 +1022,7 @@ void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *re TabletServerErrorPB::TABLET_NOT_RUNNING, context); return; } - scoped_refptr consensus; + scoped_refptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) { HandleUnknownError(Status::InvalidArgument("Invalid opid_type specified to GetLastOpId()"), @@ -1057,7 +1057,7 @@ void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateR continue; } - scoped_refptr consensus(replica->shared_consensus()); + scoped_refptr consensus(replica->shared_consensus()); if (!consensus) { continue; } http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tserver/ts_tablet_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc index db4c51d..e4a676c 100644 --- a/src/kudu/tserver/ts_tablet_manager.cc +++ b/src/kudu/tserver/ts_tablet_manager.cc @@ -303,7 +303,7 @@ Status TSTabletManager::CreateNewTablet(const string& table_id, "Couldn't create tablet metadata"); // We must persist the consensus metadata to disk before starting a new - // tablet's TabletReplica and Consensus implementation. + // tablet's TabletReplica and RaftConsensus implementation. unique_ptr cmeta; RETURN_NOT_OK_PREPEND(ConsensusMetadata::Create(fs_manager_, tablet_id, fs_manager_->uuid(), config, consensus::kMinimumTerm, &cmeta), @@ -645,10 +645,10 @@ Status TSTabletManager::DeleteTablet( // restarting the tablet if the local replica committed a higher config // change op during that time, or potentially something else more invasive. if (cas_config_opid_index_less_or_equal && !tablet_deleted) { - scoped_refptr consensus = replica->shared_consensus(); + scoped_refptr consensus = replica->shared_consensus(); if (!consensus) { *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING; - return Status::IllegalState("Consensus not available. Tablet shutting down"); + return Status::IllegalState("Raft Consensus not available. Tablet shutting down"); } RaftConfigPB committed_config = consensus->CommittedConfig(); if (committed_config.opid_index() > *cas_config_opid_index_less_or_equal) { @@ -987,7 +987,7 @@ void TSTabletManager::CreateReportedTabletPB(const string& tablet_id, reported_tablet->set_schema_version(replica->tablet_metadata()->schema_version()); // We cannot get consensus state information unless the TabletReplica is running. - scoped_refptr consensus = replica->shared_consensus(); + scoped_refptr consensus = replica->shared_consensus(); if (consensus) { *reported_tablet->mutable_consensus_state() = consensus->ConsensusState(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tserver/tserver-path-handlers.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tserver-path-handlers.cc b/src/kudu/tserver/tserver-path-handlers.cc index 4aa9407..c4599ae 100644 --- a/src/kudu/tserver/tserver-path-handlers.cc +++ b/src/kudu/tserver/tserver-path-handlers.cc @@ -253,7 +253,7 @@ void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& re .PartitionDebugString(replica->tablet_metadata()->partition(), replica->tablet_metadata()->schema()); - scoped_refptr consensus = replica->shared_consensus(); + scoped_refptr consensus = replica->shared_consensus(); (*output) << Substitute( // Table name, tablet id, partition "$0$1$2" @@ -456,7 +456,7 @@ void TabletServerPathHandlers::HandleConsensusStatusPage(const Webserver::WebReq string id; scoped_refptr replica; if (!LoadTablet(tserver_, req, &id, &replica, output)) return; - scoped_refptr consensus = replica->shared_consensus(); + scoped_refptr consensus = replica->shared_consensus(); if (!consensus) { *output << "Tablet " << EscapeForHtmlToString(id) << " not running"; return;