Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3E731200BBD for ; Mon, 3 Oct 2016 23:50:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3D64C160AEC; Mon, 3 Oct 2016 21:50:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D3661160ADC for ; Mon, 3 Oct 2016 23:50:36 +0200 (CEST) Received: (qmail 17075 invoked by uid 500); 3 Oct 2016 21:50:36 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 17022 invoked by uid 99); 3 Oct 2016 21:50:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Oct 2016 21:50:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DF910DFF4E; Mon, 3 Oct 2016 21:50:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: danburkert@apache.org To: commits@kudu.apache.org Date: Mon, 03 Oct 2016 21:50:36 -0000 Message-Id: In-Reply-To: <4120ddde18874f919cbeb58d40cb0731@git.apache.org> References: <4120ddde18874f919cbeb58d40cb0731@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/8] kudu git commit: consensus: properly truncate all state when aborting operations archived-at: Mon, 03 Oct 2016 21:50:38 -0000 consensus: properly truncate all state when aborting operations This fixes a consensus bug which was causing exactly_once_writes-itest to be slightly flaky. The issue was the following sequence: - a node A is a follower, and has some operations appended (eg 10.5 through 10.7) - a node B is elected for term 11, and sends node 'A' a status-only request containing preceding_op_id=11.6 -- node 'A' aborts operations 10.6 and 10.7 -- HOWEVER: it was not explicitly removing these operations from the LogCache or the Log. Removal was only happening on an actual operation _replacement_. - node 'B' loses its leadership before it is able to replicate anything to a majority - node 'A' gets elected for term 12 -- it calls Queue::SetLeaderMode() -- this triggers the first requests to be sent to the peer -- we hit a race where the first request is being constructed _before_ the leader appends its initial NO_OP to the queue --- because we never truncated the log cache or queue, we see operations 10.6 and 10.7 in the queue, and send them to a follower -- we now append the NO_OP 12.6 which replaces the aborted 10.6. In this case, the peer who received the fauly request from the leader may end up committing those operations whereas the rest of the nodes commit operations from term 12. The fix in this patch is to explicitly truncate the queue and the LogCache state when we are aborting operations. WIP because it needs a few more comments. To test, I looped exactly_once_writes-itest --gtest_filter=\*Churny\* 1000 times before and after. Without the patch[1], I got 17 failures, 16 of which were verification errors that one of the committed op terms did not match. With the patch[2], I got 5 failures, all of which were checksum errors while verifying the logs. Since seeing those failures, I fixed the verifier to run only after shutting down the cluster. [1] http://dist-test.cloudera.org/job?job_id=todd.1473812577.12216 [2] http://dist-test.cloudera.org/job?job_id=todd.1473811112.9830 Change-Id: I2fb95b447991b7cadc2c403bc2596fead0bd31ad Reviewed-on: http://gerrit.cloudera.org:8080/4409 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves (cherry picked from commit 1eb24183a540f4e3bbbc8a399e440ecf905f6129) Reviewed-on: http://gerrit.cloudera.org:8080/4602 Reviewed-by: Todd Lipcon Tested-by: Dan Burkert Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c4d3fb6c Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c4d3fb6c Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c4d3fb6c Branch: refs/heads/branch-1.0.x Commit: c4d3fb6ca0f22de18e8589fb55acd90e9a1ad336 Parents: 908f020 Author: Todd Lipcon Authored: Tue Sep 13 15:35:49 2016 -0700 Committer: Dan Burkert Committed: Mon Oct 3 21:39:04 2016 +0000 ---------------------------------------------------------------------- src/kudu/consensus/consensus_queue.cc | 10 ++ src/kudu/consensus/consensus_queue.h | 4 + src/kudu/consensus/log_cache-test.cc | 40 +++++ src/kudu/consensus/log_cache.cc | 33 ++-- src/kudu/consensus/log_cache.h | 12 ++ src/kudu/consensus/raft_consensus-test.cc | 6 + src/kudu/consensus/raft_consensus.cc | 12 +- src/kudu/consensus/raft_consensus.h | 4 + src/kudu/consensus/raft_consensus_state.cc | 14 +- src/kudu/consensus/raft_consensus_state.h | 2 +- src/kudu/integration-tests/CMakeLists.txt | 1 + src/kudu/integration-tests/cluster_verifier.cc | 10 ++ .../exactly_once_writes-itest.cc | 8 + src/kudu/integration-tests/log_verifier.cc | 157 +++++++++++++++++++ src/kudu/integration-tests/log_verifier.h | 60 +++++++ 15 files changed, 351 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/consensus_queue.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc index 228d723..0cafec4 100644 --- a/src/kudu/consensus/consensus_queue.cc +++ b/src/kudu/consensus/consensus_queue.cc @@ -292,6 +292,16 @@ Status PeerMessageQueue::AppendOperations(const vector& msgs, return Status::OK(); } +void PeerMessageQueue::TruncateOpsAfter(const OpId& op) { + DFAKE_SCOPED_LOCK(append_fake_lock_); // should not race with append. + + { + std::unique_lock lock(queue_lock_); + queue_state_.last_appended = op; + } + log_cache_.TruncateOpsAfter(op.index()); +} + Status PeerMessageQueue::RequestForPeer(const string& uuid, ConsensusRequestPB* request, vector* msg_refs, http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/consensus_queue.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h index 7b39a1a..43d73d7 100644 --- a/src/kudu/consensus/consensus_queue.h +++ b/src/kudu/consensus/consensus_queue.h @@ -181,6 +181,10 @@ class PeerMessageQueue { virtual Status AppendOperations(const std::vector& msgs, const StatusCallback& log_append_callback); + // Truncate all operations coming after 'op'. Following this, the 'last_appended' + // operation is reset to 'op', and the log cache will be truncated accordingly. + virtual void TruncateOpsAfter(const OpId& op); + // Assembles a request for a peer, adding entries past 'op_id' up to // 'consensus_max_batch_size_bytes'. // Returns OK if the request was assembled, or Status::NotFound() if the http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/log_cache-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/log_cache-test.cc b/src/kudu/consensus/log_cache-test.cc index 2f764e0..f19783b 100644 --- a/src/kudu/consensus/log_cache-test.cc +++ b/src/kudu/consensus/log_cache-test.cc @@ -312,5 +312,45 @@ TEST_F(LogCacheTest, TestReplaceMessages) { cache_->ToString()); } +// Test that the cache truncates any future messages when either explicitly +// truncated or replacing any earlier message. +TEST_F(LogCacheTest, TestTruncation) { + enum { + TRUNCATE_BY_APPEND, + TRUNCATE_EXPLICITLY + }; + + // Append 1 through 3. + AppendReplicateMessagesToCache(1, 3, 100); + + for (auto mode : {TRUNCATE_BY_APPEND, TRUNCATE_EXPLICITLY}) { + SCOPED_TRACE(mode == TRUNCATE_BY_APPEND ? "by append" : "explicitly"); + // Append messages 4 through 10. + AppendReplicateMessagesToCache(4, 7, 100); + ASSERT_EQ(10, cache_->metrics_.log_cache_num_ops->value()); + + switch (mode) { + case TRUNCATE_BY_APPEND: + AppendReplicateMessagesToCache(3, 1, 100); + break; + case TRUNCATE_EXPLICITLY: + cache_->TruncateOpsAfter(3); + break; + } + + ASSERT_EQ(3, cache_->metrics_.log_cache_num_ops->value()); + + // Op 3 should still be in the cache. + OpId op; + ASSERT_OK(cache_->LookupOpId(3, &op)); + ASSERT_TRUE(cache_->HasOpBeenWritten(3)); + + // Op 4 should have been removed. + Status s = cache_->LookupOpId(4, &op); + ASSERT_TRUE(s.IsIncomplete()) << "should be truncated, but got: " << s.ToString(); + ASSERT_FALSE(cache_->HasOpBeenWritten(4)); + } +} + } // namespace consensus } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/log_cache.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc index 914d3da..25ecd1c 100644 --- a/src/kudu/consensus/log_cache.cc +++ b/src/kudu/consensus/log_cache.cc @@ -114,6 +114,27 @@ void LogCache::Init(const OpId& preceding_op) { min_pinned_op_index_ = next_sequential_op_index_; } +void LogCache::TruncateOpsAfter(int64_t index) { + std::unique_lock l(lock_); + TruncateOpsAfterUnlocked(index); +} + +void LogCache::TruncateOpsAfterUnlocked(int64_t index) { + int64_t first_to_truncate = index + 1; + // If the index is not consecutive then it must be lower than or equal + // to the last index, i.e. we're overwriting. + CHECK_LE(first_to_truncate, next_sequential_op_index_); + + // Now remove the overwritten operations. + for (int64_t i = first_to_truncate; i < next_sequential_op_index_; ++i) { + ReplicateRefPtr msg = EraseKeyReturnValuePtr(&cache_, i); + if (msg != nullptr) { + AccountForMessageRemovalUnlocked(msg); + } + } + next_sequential_op_index_ = index + 1; +} + Status LogCache::AppendOperations(const vector& msgs, const StatusCallback& callback) { std::unique_lock l(lock_); @@ -127,17 +148,7 @@ Status LogCache::AppendOperations(const vector& msgs, int64_t last_idx_in_batch = msgs.back()->get()->id().index(); if (first_idx_in_batch != next_sequential_op_index_) { - // If the index is not consecutive then it must be lower than or equal - // to the last index, i.e. we're overwriting. - CHECK_LE(first_idx_in_batch, next_sequential_op_index_); - - // Now remove the overwritten operations. - for (int64_t i = first_idx_in_batch; i < next_sequential_op_index_; ++i) { - ReplicateRefPtr msg = EraseKeyReturnValuePtr(&cache_, i); - if (msg != nullptr) { - AccountForMessageRemovalUnlocked(msg); - } - } + TruncateOpsAfterUnlocked(first_idx_in_batch - 1); } http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/log_cache.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/log_cache.h b/src/kudu/consensus/log_cache.h index ba6739c..77f4d3d 100644 --- a/src/kudu/consensus/log_cache.h +++ b/src/kudu/consensus/log_cache.h @@ -97,6 +97,15 @@ class LogCache { Status AppendOperations(const std::vector& msgs, const StatusCallback& callback); + // Truncate any operations with index > 'index'. + // + // Following this, reads of truncated indexes using ReadOps(), LookupOpId(), + // HasOpBeenWritten(), etc, will return as if the operations were never appended. + // + // NOTE: unless a new operation is appended followig 'index', this truncation does + // not persist across server restarts. + void TruncateOpsAfter(int64_t index); + // Return true if an operation with the given index has been written through // the cache. The operation may not necessarily be durable yet -- it could still be // en route to the log. @@ -137,6 +146,7 @@ class LogCache { FRIEND_TEST(LogCacheTest, TestAppendAndGetMessages); FRIEND_TEST(LogCacheTest, TestGlobalMemoryLimit); FRIEND_TEST(LogCacheTest, TestReplaceMessages); + FRIEND_TEST(LogCacheTest, TestTruncation); friend class LogCacheTest; // Try to evict the oldest operations from the queue, stopping either when @@ -148,6 +158,8 @@ class LogCache { // given message. void AccountForMessageRemovalUnlocked(const ReplicateRefPtr& msg); + void TruncateOpsAfterUnlocked(int64_t index); + // Return a string with stats std::string StatsStringUnlocked() const; http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/raft_consensus-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus-test.cc b/src/kudu/consensus/raft_consensus-test.cc index cbbf333..0aa77b1 100644 --- a/src/kudu/consensus/raft_consensus-test.cc +++ b/src/kudu/consensus/raft_consensus-test.cc @@ -78,6 +78,7 @@ class MockQueue : public PeerMessageQueue { } MOCK_METHOD2(AppendOperationsMock, Status(const vector& msgs, const StatusCallback& callback)); + MOCK_METHOD1(TruncateOpsAfter, void(const OpId& op_id)); MOCK_METHOD1(TrackPeer, void(const string&)); MOCK_METHOD1(UntrackPeer, void(const string&)); MOCK_METHOD4(RequestForPeer, Status(const std::string& uuid, @@ -432,6 +433,10 @@ MATCHER_P2(RoundHasOpId, term, index, "") { return arg->id().term() == term && arg->id().index() == index; } +MATCHER_P2(EqOpId, term, index, "") { + return arg.term() == term && arg.index() == index; +} + // Tests the case where a a leader is elected and pushed a sequence of // operations of which some never get committed. Eventually a new leader in a higher // term pushes operations that overwrite some of the original indexes. @@ -488,6 +493,7 @@ TEST_F(RaftConsensusTest, TestAbortOperations) { } EXPECT_CALL(*consensus_.get(), NonTxRoundReplicationFinished(RoundHasOpId(3, 6), _, IsOk())).Times(1); + EXPECT_CALL(*queue_, TruncateOpsAfter(EqOpId(2, 5))).Times(1); // Nothing's committed so far, so now just send an Update() message // emulating another guy got elected leader and is overwriting a suffix http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/raft_consensus.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 534a594..5e88be7 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -868,12 +868,20 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ // why this is actually critical to do here, as opposed to just on requests that // append some ops. if (term_mismatch) { - return state_->AbortOpsAfterUnlocked(req.preceding_opid->index() - 1); + TruncateAndAbortOpsAfterUnlocked(req.preceding_opid->index() - 1); } return Status::OK(); } +void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index) { + state_->AbortOpsAfterUnlocked(truncate_after_index); + // Above resets the 'last received' to the operation with index 'truncate_after_index'. + OpId new_last_received = state_->GetLastReceivedOpIdUnlocked(); + DCHECK_EQ(truncate_after_index, new_last_received.index()); + queue_->TruncateOpsAfter(new_last_received); +} + Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* request, ConsensusResponsePB* response, LeaderRequest* deduped_req) { @@ -942,7 +950,7 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque // If the index is in our log but the terms are not the same abort down to the leader's // preceding id. if (term_mismatch) { - RETURN_NOT_OK(state_->AbortOpsAfterUnlocked(deduped_req->preceding_opid->index())); + TruncateAndAbortOpsAfterUnlocked(deduped_req->preceding_opid->index()); } } http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/raft_consensus.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h index 02d6593..d0f9fe0 100644 --- a/src/kudu/consensus/raft_consensus.h +++ b/src/kudu/consensus/raft_consensus.h @@ -284,6 +284,10 @@ class RaftConsensus : public Consensus, ConsensusResponsePB* response, LeaderRequest* deduped_req); + // Abort any pending operations after the given op index, + // and also truncate the LogCache accordingly. + void TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index); + // Pushes a new Raft configuration to a majority of peers. Contrary to write operations, // this actually waits for the commit round to reach a majority of peers, so that we know // we can proceed. If this returns Status::OK(), a majority of peers have accepted the new http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/raft_consensus_state.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc index 394d6e5..92222bf 100644 --- a/src/kudu/consensus/raft_consensus_state.cc +++ b/src/kudu/consensus/raft_consensus_state.cc @@ -382,23 +382,23 @@ void ReplicaState::GetUncommittedPendingOperationsUnlocked( } } -Status ReplicaState::AbortOpsAfterUnlocked(int64_t new_preceding_idx) { +void ReplicaState::AbortOpsAfterUnlocked(int64_t index) { DCHECK(update_lock_.is_locked()); LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting all transactions after (but not including): " - << new_preceding_idx << ". Current State: " << ToStringUnlocked(); + << index << ". Current State: " << ToStringUnlocked(); - DCHECK_GE(new_preceding_idx, 0); + DCHECK_GE(index, 0); OpId new_preceding; - auto iter = pending_txns_.lower_bound(new_preceding_idx); + auto iter = pending_txns_.lower_bound(index); // Either the new preceding id is in the pendings set or it must be equal to the // committed index since we can't truncate already committed operations. - if (iter != pending_txns_.end() && (*iter).first == new_preceding_idx) { + if (iter != pending_txns_.end() && (*iter).first == index) { new_preceding = (*iter).second->replicate_msg()->id(); ++iter; } else { - CHECK_EQ(new_preceding_idx, last_committed_op_id_.index()); + CHECK_EQ(index, last_committed_op_id_.index()); new_preceding = last_committed_op_id_; } @@ -429,8 +429,6 @@ Status ReplicaState::AbortOpsAfterUnlocked(int64_t new_preceding_idx) { // Erase the entry from pendings. pending_txns_.erase(iter++); } - - return Status::OK(); } Status ReplicaState::AddPendingOperation(const scoped_refptr& round) { http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/raft_consensus_state.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h index aa30ead..efcaff2 100644 --- a/src/kudu/consensus/raft_consensus_state.h +++ b/src/kudu/consensus/raft_consensus_state.h @@ -232,7 +232,7 @@ class ReplicaState { // Aborts pending operations after, but not including 'index'. The OpId with 'index' // will become our new last received id. If there are pending operations with indexes // higher than 'index' those operations are aborted. - Status AbortOpsAfterUnlocked(int64_t index); + void AbortOpsAfterUnlocked(int64_t index); // Returns the the ConsensusRound with the provided index, if there is any, or NULL // if there isn't. http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/integration-tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt index b786ecf..dad1533 100644 --- a/src/kudu/integration-tests/CMakeLists.txt +++ b/src/kudu/integration-tests/CMakeLists.txt @@ -20,6 +20,7 @@ set(INTEGRATION_TESTS_SRCS cluster_verifier.cc external_mini_cluster.cc external_mini_cluster_fs_inspector.cc + log_verifier.cc mini_cluster.cc test_workload.cc ) http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/integration-tests/cluster_verifier.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/cluster_verifier.cc b/src/kudu/integration-tests/cluster_verifier.cc index 4617b54..fd448d6 100644 --- a/src/kudu/integration-tests/cluster_verifier.cc +++ b/src/kudu/integration-tests/cluster_verifier.cc @@ -24,6 +24,7 @@ #include "kudu/client/row_result.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/integration-tests/cluster_verifier.h" +#include "kudu/integration-tests/log_verifier.h" #include "kudu/integration-tests/external_mini_cluster.h" #include "kudu/tools/ksck_remote.h" #include "kudu/util/monotime.h" @@ -74,6 +75,15 @@ void ClusterVerifier::CheckCluster() { SleepFor(MonoDelta::FromSeconds(sleep_time)); } ASSERT_OK(s); + + // Verify that the committed op indexes match up across the servers. + // We have to use "AssertEventually" here because many tests verify clusters + // while they are still running, and the verification can fail spuriously in + // the case that + LogVerifier lv(cluster_); + AssertEventually([&]() { + ASSERT_OK(lv.VerifyCommittedOpIdsMatch()); + }); } Status ClusterVerifier::DoKsck() { http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/integration-tests/exactly_once_writes-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc b/src/kudu/integration-tests/exactly_once_writes-itest.cc index f74da83..c2eed00 100644 --- a/src/kudu/integration-tests/exactly_once_writes-itest.cc +++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include "kudu/integration-tests/log_verifier.h" #include "kudu/integration-tests/ts_itest-base.h" #include "kudu/util/barrier.h" #include "kudu/util/logging.h" @@ -223,6 +224,13 @@ void ExactlyOnceSemanticsITest::DoTestWritesWithExactlyOnceSemantics( if (mismatched) { FAIL() << "Got mismatched responses"; } + + // Check that the servers have matching commit indexes. We shut down first because otherwise + // they keep appending to the logs, and the verifier can hit checksum issues trying to + // read from a log which is in the process of being written. + cluster_->Shutdown(); + LogVerifier lv(cluster_.get()); + ASSERT_OK(lv.VerifyCommittedOpIdsMatch()); } // This tests exactly once semantics by starting a cluster with multiple replicas and attempting http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/integration-tests/log_verifier.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/log_verifier.cc b/src/kudu/integration-tests/log_verifier.cc new file mode 100644 index 0000000..cbd1933 --- /dev/null +++ b/src/kudu/integration-tests/log_verifier.cc @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/integration-tests/log_verifier.h" + +#include +#include +#include +#include +#include +#include + +#include + +#include "kudu/consensus/log_index.h" +#include "kudu/consensus/log_reader.h" +#include "kudu/consensus/log_util.h" +#include "kudu/fs/fs_manager.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/integration-tests/external_mini_cluster.h" +#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h" +#include "kudu/util/metrics.h" +#include "kudu/util/status.h" + +using std::map; +using std::set; +using std::shared_ptr; +using std::string; +using std::vector; +using strings::Substitute; + +namespace kudu { + +using log::LogReader; +using itest::ExternalMiniClusterFsInspector; + +LogVerifier::LogVerifier(ExternalMiniCluster* cluster) + : cluster_(cluster) { +} + +LogVerifier::~LogVerifier() { +} + +Status LogVerifier::ScanForCommittedOpIds(FsManager* fs, const string& tablet_id, + map* index_to_term) { + + shared_ptr reader; + RETURN_NOT_OK(LogReader::Open(fs, scoped_refptr(), tablet_id, + scoped_refptr(), &reader)); + log::SegmentSequence segs; + RETURN_NOT_OK(reader->GetSegmentsSnapshot(&segs)); + log::LogEntryPB entry; + for (const auto& seg : segs) { + log::LogEntryReader reader(seg.get()); + while (true) { + Status s = reader.ReadNextEntry(&entry); + if (s.IsEndOfFile() || s.IsCorruption()) break; + RETURN_NOT_OK(s); + if (entry.type() != log::COMMIT) continue; + const auto& op_id = entry.commit().commited_op_id(); + + if (!InsertIfNotPresent(index_to_term, op_id.index(), op_id.term())) { + return Status::Corruption(Substitute( + "Index $0 had two COMMIT messages: $1.$0 and $2.$0", + op_id.index(), op_id.term(), (*index_to_term)[op_id.index()])); + } + } + } + + return Status::OK(); +} + +Status LogVerifier::VerifyCommittedOpIdsMatch() { + ExternalMiniClusterFsInspector inspect(cluster_); + Env* env = Env::Default(); + + for (const string& tablet_id : inspect.ListTablets()) { + LOG(INFO) << "Checking tablet " << tablet_id; + + // Union set of the op indexes seen on any server. + set all_op_indexes; + // For each server in the cluster, a map of [index->term]. + vector> maps_by_ts(cluster_->num_tablet_servers()); + + // Gather the [index->term] map for each of the tablet servers + // hosting this tablet. + for (int i = 0; i < cluster_->num_tablet_servers(); i++) { + const string& data_dir = cluster_->tablet_server(i)->data_dir(); + + FsManagerOpts fs_opts; + fs_opts.read_only = true; + fs_opts.wal_path = data_dir; + FsManager fs(env, fs_opts); + RETURN_NOT_OK_PREPEND(fs.Open(), + Substitute("Couldn't initialize FS Manager for $0", data_dir)); + const string& wal_dir = fs.GetTabletWalDir(tablet_id); + if (!env->FileExists(wal_dir)) continue; + map index_to_term; + RETURN_NOT_OK_PREPEND(ScanForCommittedOpIds(&fs, tablet_id, &index_to_term), + Substitute("Couldn't scan log for TS $0", i)); + for (const auto& index_term : index_to_term) { + all_op_indexes.insert(index_term.first); + } + maps_by_ts[i] = std::move(index_to_term); + } + + // Check that the terms match up across servers. + vector committed_terms; + // Indicates that the op is not on this server. + const int64_t kNotOnThisServer = -1; + for (int64_t index : all_op_indexes) { + committed_terms.clear(); + for (int ts = 0; ts < cluster_->num_tablet_servers(); ts++) { + committed_terms.push_back(FindWithDefault(maps_by_ts[ts], index, kNotOnThisServer)); + } + // 'committed_terms' entries should all be kNotOnThisServer or the same as each other. + boost::optional expected_term; + for (int ts = 0; ts < cluster_->num_tablet_servers(); ts++) { + int64_t this_ts_term = committed_terms[ts]; + if (this_ts_term == kNotOnThisServer) continue; // this TS doesn't have the op + if (expected_term == boost::none) { + expected_term = this_ts_term; + } else if (this_ts_term != expected_term) { + string err = Substitute("Mismatch found for index $0, [", index); + for (int i = 0; i < cluster_->num_tablet_servers(); i++) { + if (i != 0) err += ", "; + strings::SubstituteAndAppend(&err, "T $0=$1", + cluster_->tablet_server(i)->uuid(), + committed_terms[i]); + } + err += "]"; + return Status::Corruption(err); + } + } + } + + LOG(INFO) << "Verified matching terms for " << all_op_indexes.size() << " ops in tablet " + << tablet_id; + } + return Status::OK(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/integration-tests/log_verifier.h ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/log_verifier.h b/src/kudu/integration-tests/log_verifier.h new file mode 100644 index 0000000..cb33958 --- /dev/null +++ b/src/kudu/integration-tests/log_verifier.h @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include + +#include "kudu/gutil/macros.h" +#include "kudu/util/status.h" + +namespace kudu { + +class ExternalMiniCluster; +class FsManager; + +// Verifies correctness of the logs in an external mini-cluster. +class LogVerifier { + public: + explicit LogVerifier(ExternalMiniCluster* cluster); + ~LogVerifier(); + + // Verify that, for every tablet in the cluster, the logs of each of that tablet's replicas + // have matching committed operations. In other words, if any replica has a log entry + // 'COMMIT term.index', then verifies that no other replica has a COMMIT entry for the + // same index with a different term. + // + // This is the most basic correctness condition of Raft: all replicas should commit the + // same operations. + // + // NOTE: if the cluster is not shut down, it is possible for this method to fail spuriously + // trying to read a WAL that is currently being written. In this case, it's advisable to + // loop and retry on failure. + Status VerifyCommittedOpIdsMatch(); + + private: + // Scan the WALs for tablet 'tablet_id' on the given 'fs'. Sets entries + // in '*index_to_term' for each COMMIT entry found in the WALs. + Status ScanForCommittedOpIds(FsManager* fs, const std::string& tablet_id, + std::map* index_to_term); + + ExternalMiniCluster* const cluster_; + + DISALLOW_COPY_AND_ASSIGN(LogVerifier); +}; + +} // namespace kudu