Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CD386200C92 for ; Fri, 5 May 2017 00:14:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CBBA5160BC4; Thu, 4 May 2017 22:14:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 52AB7160BC6 for ; Fri, 5 May 2017 00:14:27 +0200 (CEST) Received: (qmail 6595 invoked by uid 500); 4 May 2017 22:14:26 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 6531 invoked by uid 99); 4 May 2017 22:14:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 May 2017 22:14:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4DB3CE04AA; Thu, 4 May 2017 22:14:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mpercy@apache.org To: commits@kudu.apache.org Date: Thu, 04 May 2017 22:14:29 -0000 Message-Id: In-Reply-To: <0317384e90114e49aacabcc070a379a1@git.apache.org> References: <0317384e90114e49aacabcc070a379a1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/5] kudu git commit: Rename TabletPeer to TabletReplica archived-at: Thu, 04 May 2017 22:14:30 -0000 http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_peer-test.cc b/src/kudu/tablet/tablet_peer-test.cc deleted file mode 100644 index 95c7d0c..0000000 --- a/src/kudu/tablet/tablet_peer-test.cc +++ /dev/null @@ -1,564 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include - -#include "kudu/common/partial_row.h" -#include "kudu/common/timestamp.h" -#include "kudu/common/wire_protocol.h" -#include "kudu/common/wire_protocol-test-util.h" -#include "kudu/consensus/consensus_meta.h" -#include "kudu/consensus/log.h" -#include "kudu/consensus/log_reader.h" -#include "kudu/consensus/log_util.h" -#include "kudu/consensus/opid_util.h" -#include "kudu/gutil/gscoped_ptr.h" -#include "kudu/gutil/macros.h" -#include "kudu/rpc/messenger.h" -#include "kudu/server/clock.h" -#include "kudu/server/logical_clock.h" -#include "kudu/tablet/transactions/transaction.h" -#include "kudu/tablet/transactions/transaction_driver.h" -#include "kudu/tablet/transactions/write_transaction.h" -#include "kudu/tablet/tablet_peer.h" -#include "kudu/tablet/tablet_peer_mm_ops.h" -#include "kudu/tablet/tablet-test-util.h" -#include "kudu/tserver/tserver.pb.h" -#include "kudu/util/maintenance_manager.h" -#include "kudu/util/metrics.h" -#include "kudu/util/pb_util.h" -#include "kudu/util/test_macros.h" -#include "kudu/util/test_util.h" -#include "kudu/util/threadpool.h" - -METRIC_DECLARE_entity(tablet); - -DECLARE_int32(flush_threshold_mb); - -namespace kudu { -namespace tablet { - -using consensus::CommitMsg; -using consensus::Consensus; -using consensus::ConsensusBootstrapInfo; -using consensus::ConsensusMetadata; -using consensus::MakeOpId; -using consensus::MinimumOpId; -using consensus::OpId; -using consensus::OpIdEquals; -using consensus::RaftPeerPB; -using consensus::WRITE_OP; -using log::Log; -using log::LogAnchorRegistry; -using log::LogOptions; -using rpc::Messenger; -using server::Clock; -using server::LogicalClock; -using std::shared_ptr; -using std::string; -using std::unique_ptr; -using strings::Substitute; -using tserver::WriteRequestPB; -using tserver::WriteResponsePB; - -static Schema GetTestSchema() { - return Schema({ ColumnSchema("key", INT32) }, 1); -} - -class TabletPeerTest : public KuduTabletTest { - public: - TabletPeerTest() - : KuduTabletTest(GetTestSchema()), - insert_counter_(0), - delete_counter_(0) { - } - - virtual void SetUp() OVERRIDE { - KuduTabletTest::SetUp(); - - ASSERT_OK(ThreadPoolBuilder("apply").Build(&apply_pool_)); - - rpc::MessengerBuilder builder(CURRENT_TEST_NAME()); - ASSERT_OK(builder.Build(&messenger_)); - - metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet"); - - RaftPeerPB config_peer; - config_peer.set_permanent_uuid(tablet()->metadata()->fs_manager()->uuid()); - config_peer.mutable_last_known_addr()->set_host("0.0.0.0"); - config_peer.mutable_last_known_addr()->set_port(0); - config_peer.set_member_type(RaftPeerPB::VOTER); - - // "Bootstrap" and start the TabletPeer. - tablet_peer_.reset( - new TabletPeer(make_scoped_refptr(tablet()->metadata()), - config_peer, - apply_pool_.get(), - Bind(&TabletPeerTest::TabletPeerStateChangedCallback, - Unretained(this), - tablet()->tablet_id()))); - - // Make TabletPeer use the same LogAnchorRegistry as the Tablet created by the harness. - // TODO: Refactor TabletHarness to allow taking a LogAnchorRegistry, while also providing - // TabletMetadata for consumption by TabletPeer before Tablet is instantiated. - tablet_peer_->log_anchor_registry_ = tablet()->log_anchor_registry_; - - RaftConfigPB config; - config.add_peers()->CopyFrom(config_peer); - config.set_opid_index(consensus::kInvalidOpIdIndex); - - unique_ptr cmeta; - ASSERT_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(), - tablet()->tablet_id(), - tablet()->metadata()->fs_manager()->uuid(), - config, - consensus::kMinimumTerm, &cmeta)); - - scoped_refptr log; - ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(), - *tablet()->schema(), tablet()->metadata()->schema_version(), - metric_entity_.get(), &log)); - - tablet_peer_->SetBootstrapping(); - ASSERT_OK(tablet_peer_->Init(tablet(), - clock(), - messenger_, - scoped_refptr(), - log, - metric_entity_)); - } - - Status StartPeer(const ConsensusBootstrapInfo& info) { - RETURN_NOT_OK(tablet_peer_->Start(info)); - RETURN_NOT_OK(tablet_peer_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10))); - return Status::OK(); - } - - void TabletPeerStateChangedCallback(const string& tablet_id, const string& reason) { - LOG(INFO) << "Tablet peer state changed for tablet " << tablet_id << ". Reason: " << reason; - } - - virtual void TearDown() OVERRIDE { - tablet_peer_->Shutdown(); - apply_pool_->Shutdown(); - KuduTabletTest::TearDown(); - } - - protected: - // Generate monotonic sequence of key column integers. - Status GenerateSequentialInsertRequest(WriteRequestPB* write_req) { - Schema schema(GetTestSchema()); - write_req->set_tablet_id(tablet()->tablet_id()); - CHECK_OK(SchemaToPB(schema, write_req->mutable_schema())); - - KuduPartialRow row(&schema); - CHECK_OK(row.SetInt32("key", insert_counter_++)); - - RowOperationsPBEncoder enc(write_req->mutable_row_operations()); - enc.Add(RowOperationsPB::INSERT, row); - return Status::OK(); - } - - // Generate monotonic sequence of deletions, starting with 0. - // Will assert if you try to delete more rows than you inserted. - Status GenerateSequentialDeleteRequest(WriteRequestPB* write_req) { - CHECK_LT(delete_counter_, insert_counter_); - Schema schema(GetTestSchema()); - write_req->set_tablet_id(tablet()->tablet_id()); - CHECK_OK(SchemaToPB(schema, write_req->mutable_schema())); - - KuduPartialRow row(&schema); - CHECK_OK(row.SetInt32("key", delete_counter_++)); - - RowOperationsPBEncoder enc(write_req->mutable_row_operations()); - enc.Add(RowOperationsPB::DELETE, row); - return Status::OK(); - } - - Status ExecuteWriteAndRollLog(TabletPeer* tablet_peer, const WriteRequestPB& req) { - gscoped_ptr resp(new WriteResponsePB()); - unique_ptr tx_state(new WriteTransactionState(tablet_peer, - &req, - nullptr, // No RequestIdPB - resp.get())); - - CountDownLatch rpc_latch(1); - tx_state->set_completion_callback(gscoped_ptr( - new LatchTransactionCompletionCallback(&rpc_latch, resp.get()))); - - CHECK_OK(tablet_peer->SubmitWrite(std::move(tx_state))); - rpc_latch.Wait(); - CHECK(!resp->has_error()) - << "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp); - - // Roll the log after each write. - // Usually the append thread does the roll and no additional sync is required. However in - // this test the thread that is appending is not the same thread that is rolling the log - // so we must make sure the Log's queue is flushed before we roll or we might have a race - // between the appender thread and the thread executing the test. - CHECK_OK(tablet_peer->log_->WaitUntilAllFlushed()); - CHECK_OK(tablet_peer->log_->AllocateSegmentAndRollOver()); - return Status::OK(); - } - - // Execute insert requests and roll log after each one. - Status ExecuteInsertsAndRollLogs(int num_inserts) { - for (int i = 0; i < num_inserts; i++) { - gscoped_ptr req(new WriteRequestPB()); - RETURN_NOT_OK(GenerateSequentialInsertRequest(req.get())); - RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_peer_.get(), *req)); - } - - return Status::OK(); - } - - // Execute delete requests and roll log after each one. - Status ExecuteDeletesAndRollLogs(int num_deletes) { - for (int i = 0; i < num_deletes; i++) { - gscoped_ptr req(new WriteRequestPB()); - CHECK_OK(GenerateSequentialDeleteRequest(req.get())); - CHECK_OK(ExecuteWriteAndRollLog(tablet_peer_.get(), *req)); - } - - return Status::OK(); - } - - void AssertNoLogAnchors() { - // Make sure that there are no registered anchors in the registry - CHECK_EQ(0, tablet_peer_->log_anchor_registry()->GetAnchorCountForTests()); - } - - // Assert that the Log GC() anchor is earlier than the latest OpId in the Log. - void AssertLogAnchorEarlierThanLogLatest() { - log::RetentionIndexes retention = tablet_peer_->GetRetentionIndexes(); - OpId last_log_opid; - tablet_peer_->log_->GetLatestEntryOpId(&last_log_opid); - CHECK_LT(retention.for_durability, last_log_opid.index()) - << "Expected valid log anchor, got earliest opid: " << retention.for_durability - << " (expected any value earlier than last log id: " << SecureShortDebugString(last_log_opid) - << ")"; - } - - // We disable automatic log GC. Don't leak those changes. - google::FlagSaver flag_saver_; - - int32_t insert_counter_; - int32_t delete_counter_; - MetricRegistry metric_registry_; - scoped_refptr metric_entity_; - shared_ptr messenger_; - scoped_refptr tablet_peer_; - gscoped_ptr apply_pool_; -}; - -// A Transaction that waits on the apply_continue latch inside of Apply(). -class DelayedApplyTransaction : public WriteTransaction { - public: - DelayedApplyTransaction(CountDownLatch* apply_started, - CountDownLatch* apply_continue, - unique_ptr state) - : WriteTransaction(std::move(state), consensus::LEADER), - apply_started_(DCHECK_NOTNULL(apply_started)), - apply_continue_(DCHECK_NOTNULL(apply_continue)) { - } - - virtual Status Apply(gscoped_ptr* commit_msg) OVERRIDE { - apply_started_->CountDown(); - LOG(INFO) << "Delaying apply..."; - apply_continue_->Wait(); - LOG(INFO) << "Apply proceeding"; - return WriteTransaction::Apply(commit_msg); - } - - private: - CountDownLatch* apply_started_; - CountDownLatch* apply_continue_; - DISALLOW_COPY_AND_ASSIGN(DelayedApplyTransaction); -}; - -// Ensure that Log::GC() doesn't delete logs when the MRS has an anchor. -TEST_F(TabletPeerTest, TestMRSAnchorPreventsLogGC) { - ConsensusBootstrapInfo info; - ASSERT_OK(StartPeer(info)); - - Log* log = tablet_peer_->log_.get(); - int32_t num_gced; - - AssertNoLogAnchors(); - - log::SegmentSequence segments; - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - - ASSERT_EQ(1, segments.size()); - ASSERT_OK(ExecuteInsertsAndRollLogs(3)); - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(4, segments.size()); - - AssertLogAnchorEarlierThanLogLatest(); - ASSERT_GT(tablet_peer_->log_anchor_registry()->GetAnchorCountForTests(), 0); - - // Ensure nothing gets deleted. - log::RetentionIndexes retention = tablet_peer_->GetRetentionIndexes(); - ASSERT_OK(log->GC(retention, &num_gced)); - ASSERT_EQ(0, num_gced) << "earliest needed: " << retention.for_durability; - - // Flush MRS as needed to ensure that we don't have OpId anchors in the MRS. - tablet_peer_->tablet()->Flush(); - AssertNoLogAnchors(); - - // The first two segments should be deleted. - // The last is anchored due to the commit in the last segment being the last - // OpId in the log. - retention = tablet_peer_->GetRetentionIndexes(); - ASSERT_OK(log->GC(retention, &num_gced)); - ASSERT_EQ(2, num_gced) << "earliest needed: " << retention.for_durability; - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(2, segments.size()); -} - -// Ensure that Log::GC() doesn't delete logs when the DMS has an anchor. -TEST_F(TabletPeerTest, TestDMSAnchorPreventsLogGC) { - ConsensusBootstrapInfo info; - ASSERT_OK(StartPeer(info)); - - Log* log = tablet_peer_->log_.get(); - int32_t num_gced; - - AssertNoLogAnchors(); - - log::SegmentSequence segments; - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - - ASSERT_EQ(1, segments.size()); - ASSERT_OK(ExecuteInsertsAndRollLogs(2)); - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(3, segments.size()); - - // Flush MRS & GC log so the next mutation goes into a DMS. - ASSERT_OK(tablet_peer_->tablet()->Flush()); - log::RetentionIndexes retention = tablet_peer_->GetRetentionIndexes(); - ASSERT_OK(log->GC(retention, &num_gced)); - // We will only GC 1, and have 1 left because the earliest needed OpId falls - // back to the latest OpId written to the Log if no anchors are set. - ASSERT_EQ(1, num_gced); - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(2, segments.size()); - AssertNoLogAnchors(); - - OpId id; - log->GetLatestEntryOpId(&id); - LOG(INFO) << "Before: " << SecureShortDebugString(id); - - - // We currently have no anchors and the last operation in the log is 0.3 - // Before the below was ExecuteDeletesAndRollLogs(1) but that was breaking - // what I think is a wrong assertion. - // I.e. since 0.4 is the last operation that we know is in memory 0.4 is the - // last anchor we expect _and_ it's the last op in the log. - // Only if we apply two operations is the last anchored operation and the - // last operation in the log different. - - // Execute a mutation. - ASSERT_OK(ExecuteDeletesAndRollLogs(2)); - AssertLogAnchorEarlierThanLogLatest(); - ASSERT_GT(tablet_peer_->log_anchor_registry()->GetAnchorCountForTests(), 0); - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(4, segments.size()); - - // Execute another couple inserts, but Flush it so it doesn't anchor. - ASSERT_OK(ExecuteInsertsAndRollLogs(2)); - ASSERT_OK(tablet_peer_->tablet()->Flush()); - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(6, segments.size()); - - // Ensure the delta and last insert remain in the logs, anchored by the delta. - // Note that this will allow GC of the 2nd insert done above. - retention = tablet_peer_->GetRetentionIndexes(); - ASSERT_OK(log->GC(retention, &num_gced)); - ASSERT_EQ(1, num_gced); - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(5, segments.size()); - - // Flush DMS to release the anchor. - tablet_peer_->tablet()->FlushBiggestDMS(); - - // Verify no anchors after Flush(). - AssertNoLogAnchors(); - - // We should only hang onto one segment due to no anchors. - // The last log OpId is the commit in the last segment, so it only anchors - // that segment, not the previous, because it's not the first OpId in the - // segment. - retention = tablet_peer_->GetRetentionIndexes(); - ASSERT_OK(log->GC(retention, &num_gced)); - ASSERT_EQ(3, num_gced); - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(2, segments.size()); -} - -// Ensure that Log::GC() doesn't compact logs with OpIds of active transactions. -TEST_F(TabletPeerTest, TestActiveTransactionPreventsLogGC) { - ConsensusBootstrapInfo info; - ASSERT_OK(StartPeer(info)); - - Log* log = tablet_peer_->log_.get(); - int32_t num_gced; - - AssertNoLogAnchors(); - - log::SegmentSequence segments; - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - - ASSERT_EQ(1, segments.size()); - ASSERT_OK(ExecuteInsertsAndRollLogs(4)); - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(5, segments.size()); - - // Flush MRS as needed to ensure that we don't have OpId anchors in the MRS. - ASSERT_EQ(1, tablet_peer_->log_anchor_registry()->GetAnchorCountForTests()); - tablet_peer_->tablet()->Flush(); - - // Verify no anchors after Flush(). - AssertNoLogAnchors(); - - // Now create a long-lived Transaction that hangs during Apply(). - // Allow other transactions to go through. Logs should be populated, but the - // long-lived Transaction should prevent the log from being deleted since it - // is in-flight. - CountDownLatch rpc_latch(1); - CountDownLatch apply_started(1); - CountDownLatch apply_continue(1); - gscoped_ptr req(new WriteRequestPB()); - gscoped_ptr resp(new WriteResponsePB()); - { - // Long-running mutation. - ASSERT_OK(GenerateSequentialDeleteRequest(req.get())); - unique_ptr tx_state(new WriteTransactionState(tablet_peer_.get(), - req.get(), - nullptr, // No RequestIdPB - resp.get())); - - tx_state->set_completion_callback(gscoped_ptr( - new LatchTransactionCompletionCallback(&rpc_latch, resp.get()))); - - gscoped_ptr transaction( - new DelayedApplyTransaction(&apply_started, - &apply_continue, - std::move(tx_state))); - - scoped_refptr driver; - ASSERT_OK(tablet_peer_->NewLeaderTransactionDriver(transaction.PassAs(), - &driver)); - - ASSERT_OK(driver->ExecuteAsync()); - apply_started.Wait(); - ASSERT_TRUE(driver->GetOpId().IsInitialized()) - << "By the time a transaction is applied, it should have an Opid"; - // The apply will hang until we CountDown() the continue latch. - // Now, roll the log. Below, we execute a few more insertions with rolling. - ASSERT_OK(log->AllocateSegmentAndRollOver()); - } - - ASSERT_EQ(1, tablet_peer_->txn_tracker_.GetNumPendingForTests()); - // The log anchor is currently equal to the latest OpId written to the Log - // because we are delaying the Commit message with the CountDownLatch. - - // GC the first four segments created by the inserts. - log::RetentionIndexes retention = tablet_peer_->GetRetentionIndexes(); - ASSERT_OK(log->GC(retention, &num_gced)); - ASSERT_EQ(4, num_gced); - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(2, segments.size()); - - // We use mutations here, since an MRS Flush() quiesces the tablet, and we - // want to ensure the only thing "anchoring" is the TransactionTracker. - ASSERT_OK(ExecuteDeletesAndRollLogs(3)); - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(5, segments.size()); - ASSERT_EQ(1, tablet_peer_->log_anchor_registry()->GetAnchorCountForTests()); - tablet_peer_->tablet()->FlushBiggestDMS(); - ASSERT_EQ(0, tablet_peer_->log_anchor_registry()->GetAnchorCountForTests()); - ASSERT_EQ(1, tablet_peer_->txn_tracker_.GetNumPendingForTests()); - - AssertLogAnchorEarlierThanLogLatest(); - - // Try to GC(), nothing should be deleted due to the in-flight transaction. - retention = tablet_peer_->GetRetentionIndexes(); - ASSERT_OK(log->GC(retention, &num_gced)); - ASSERT_EQ(0, num_gced); - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(5, segments.size()); - - // Now we release the transaction and wait for everything to complete. - // We fully quiesce and flush, which should release all anchors. - ASSERT_EQ(1, tablet_peer_->txn_tracker_.GetNumPendingForTests()); - apply_continue.CountDown(); - rpc_latch.Wait(); - tablet_peer_->txn_tracker_.WaitForAllToFinish(); - ASSERT_EQ(0, tablet_peer_->txn_tracker_.GetNumPendingForTests()); - tablet_peer_->tablet()->FlushBiggestDMS(); - AssertNoLogAnchors(); - - // All should be deleted except the two last segments. - retention = tablet_peer_->GetRetentionIndexes(); - ASSERT_OK(log->GC(retention, &num_gced)); - ASSERT_EQ(3, num_gced); - ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(2, segments.size()); -} - -TEST_F(TabletPeerTest, TestGCEmptyLog) { - ConsensusBootstrapInfo info; - tablet_peer_->Start(info); - // We don't wait on consensus on purpose. - ASSERT_OK(tablet_peer_->RunLogGC()); -} - -TEST_F(TabletPeerTest, TestFlushOpsPerfImprovements) { - FLAGS_flush_threshold_mb = 64; - - MaintenanceOpStats stats; - - // Just on the threshold and not enough time has passed for a time-based flush. - stats.set_ram_anchored(64 * 1024 * 1024); - FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1); - ASSERT_EQ(0.0, stats.perf_improvement()); - stats.Clear(); - - // Just on the threshold and enough time has passed, we'll have a low improvement. - stats.set_ram_anchored(64 * 1024 * 1024); - FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 3 * 60 * 1000); - ASSERT_GT(stats.perf_improvement(), 0.01); - stats.Clear(); - - // Way over the threshold, number is much higher than 1. - stats.set_ram_anchored(128 * 1024 * 1024); - FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1); - ASSERT_LT(1.0, stats.perf_improvement()); - stats.Clear(); - - // Below the threshold but have been there a long time, closing in to 1.0. - stats.set_ram_anchored(30 * 1024 * 1024); - FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 60 * 50 * 1000); - ASSERT_LT(0.7, stats.perf_improvement()); - ASSERT_GT(1.0, stats.perf_improvement()); - stats.Clear(); -} - -} // namespace tablet -} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_peer.cc b/src/kudu/tablet/tablet_peer.cc deleted file mode 100644 index d9bddb4..0000000 --- a/src/kudu/tablet/tablet_peer.cc +++ /dev/null @@ -1,672 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "kudu/tablet/tablet_peer.h" - -#include -#include -#include -#include -#include -#include -#include - -#include "kudu/consensus/consensus.h" -#include "kudu/consensus/consensus_meta.h" -#include "kudu/consensus/log.h" -#include "kudu/consensus/log_util.h" -#include "kudu/consensus/opid_util.h" -#include "kudu/consensus/log_anchor_registry.h" -#include "kudu/consensus/quorum_util.h" -#include "kudu/consensus/raft_consensus.h" -#include "kudu/gutil/mathlimits.h" -#include "kudu/gutil/stl_util.h" -#include "kudu/gutil/strings/substitute.h" -#include "kudu/gutil/sysinfo.h" -#include "kudu/rpc/messenger.h" -#include "kudu/rpc/remote_method.h" -#include "kudu/rpc/rpc_service.h" -#include "kudu/rpc/service_pool.h" -#include "kudu/tablet/transactions/transaction_driver.h" -#include "kudu/tablet/transactions/alter_schema_transaction.h" -#include "kudu/tablet/transactions/write_transaction.h" -#include "kudu/tablet/tablet_bootstrap.h" -#include "kudu/tablet/tablet_metrics.h" -#include "kudu/tablet/tablet_peer_mm_ops.h" -#include "kudu/tablet/tablet.pb.h" -#include "kudu/util/logging.h" -#include "kudu/util/metrics.h" -#include "kudu/util/pb_util.h" -#include "kudu/util/stopwatch.h" -#include "kudu/util/threadpool.h" -#include "kudu/util/trace.h" - -using std::map; -using std::shared_ptr; -using std::unique_ptr; - -namespace kudu { -namespace tablet { - -METRIC_DEFINE_histogram(tablet, op_prepare_queue_length, "Operation Prepare Queue Length", - MetricUnit::kTasks, - "Number of operations waiting to be prepared within this tablet. " - "High queue lengths indicate that the server is unable to process " - "operations as fast as they are being written to the WAL.", - 10000, 2); - -METRIC_DEFINE_histogram(tablet, op_prepare_queue_time, "Operation Prepare Queue Time", - MetricUnit::kMicroseconds, - "Time that operations spent waiting in the prepare queue before being " - "processed. High queue times indicate that the server is unable to " - "process operations as fast as they are being written to the WAL.", - 10000000, 2); - -METRIC_DEFINE_histogram(tablet, op_prepare_run_time, "Operation Prepare Run Time", - MetricUnit::kMicroseconds, - "Time that operations spent being prepared in the tablet. " - "High values may indicate that the server is under-provisioned or " - "that operations are experiencing high contention with one another for " - "locks.", - 10000000, 2); - -using consensus::Consensus; -using consensus::ConsensusBootstrapInfo; -using consensus::ConsensusMetadata; -using consensus::ConsensusOptions; -using consensus::ConsensusRound; -using consensus::OpId; -using consensus::RaftConfigPB; -using consensus::RaftPeerPB; -using consensus::RaftConsensus; -using consensus::TimeManager; -using consensus::ALTER_SCHEMA_OP; -using consensus::WRITE_OP; -using log::Log; -using log::LogAnchorRegistry; -using rpc::Messenger; -using rpc::ResultTracker; -using strings::Substitute; -using tserver::TabletServerErrorPB; - -// ============================================================================ -// Tablet Peer -// ============================================================================ -TabletPeer::TabletPeer(const scoped_refptr& meta, - const consensus::RaftPeerPB& local_peer_pb, - ThreadPool* apply_pool, - Callback mark_dirty_clbk) - : meta_(meta), - tablet_id_(meta->tablet_id()), - local_peer_pb_(local_peer_pb), - state_(NOT_STARTED), - last_status_("Tablet initializing..."), - apply_pool_(apply_pool), - log_anchor_registry_(new LogAnchorRegistry()), - mark_dirty_clbk_(std::move(mark_dirty_clbk)) {} - -TabletPeer::~TabletPeer() { - std::lock_guard lock(lock_); - // We should either have called Shutdown(), or we should have never called - // Init(). - CHECK(!tablet_) - << "TabletPeer not fully shut down. State: " - << TabletStatePB_Name(state_); -} - -Status TabletPeer::Init(const shared_ptr& tablet, - const scoped_refptr& clock, - const shared_ptr& messenger, - const scoped_refptr& result_tracker, - const scoped_refptr& log, - const scoped_refptr& metric_entity) { - - DCHECK(tablet) << "A TabletPeer must be provided with a Tablet"; - DCHECK(log) << "A TabletPeer must be provided with a Log"; - - RETURN_NOT_OK(ThreadPoolBuilder("prepare").set_max_threads(1).Build(&prepare_pool_)); - prepare_pool_->SetQueueLengthHistogram( - METRIC_op_prepare_queue_length.Instantiate(metric_entity)); - prepare_pool_->SetQueueTimeMicrosHistogram( - METRIC_op_prepare_queue_time.Instantiate(metric_entity)); - prepare_pool_->SetRunTimeMicrosHistogram( - METRIC_op_prepare_run_time.Instantiate(metric_entity)); - - { - std::lock_guard lock(lock_); - CHECK_EQ(BOOTSTRAPPING, state_); - tablet_ = tablet; - clock_ = clock; - messenger_ = messenger; - log_ = log; - result_tracker_ = result_tracker; - - ConsensusOptions options; - options.tablet_id = meta_->tablet_id(); - - TRACE("Creating consensus instance"); - - unique_ptr cmeta; - RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_, - meta_->fs_manager()->uuid(), &cmeta)); - - scoped_refptr time_manager(new TimeManager( - clock, tablet_->mvcc_manager()->GetCleanTimestamp())); - - consensus_ = RaftConsensus::Create(options, - std::move(cmeta), - local_peer_pb_, - metric_entity, - time_manager, - this, - messenger_, - log_.get(), - tablet_->mem_tracker(), - mark_dirty_clbk_); - } - - if (tablet_->metrics() != nullptr) { - TRACE("Starting instrumentation"); - txn_tracker_.StartInstrumentation(tablet_->GetMetricEntity()); - } - txn_tracker_.StartMemoryTracking(tablet_->mem_tracker()); - - TRACE("TabletPeer::Init() finished"); - VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer Initted"; - return Status::OK(); -} - -Status TabletPeer::Start(const ConsensusBootstrapInfo& bootstrap_info) { - std::lock_guard l(state_change_lock_); - TRACE("Starting consensus"); - - VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer starting"; - - VLOG(2) << "RaftConfig before starting: " << SecureDebugString(consensus_->CommittedConfig()); - - RETURN_NOT_OK(consensus_->Start(bootstrap_info)); - { - std::lock_guard lock(lock_); - CHECK_EQ(state_, BOOTSTRAPPING); - state_ = RUNNING; - } - - // Because we changed the tablet state, we need to re-report the tablet to the master. - mark_dirty_clbk_.Run("Started TabletPeer"); - - return Status::OK(); -} - -const consensus::RaftConfigPB TabletPeer::RaftConfig() const { - CHECK(consensus_) << "consensus is null"; - return consensus_->CommittedConfig(); -} - -void TabletPeer::Shutdown() { - - LOG(INFO) << "Initiating TabletPeer shutdown for tablet: " << tablet_id_; - - { - std::unique_lock lock(lock_); - if (state_ == QUIESCING || state_ == SHUTDOWN) { - lock.unlock(); - WaitUntilShutdown(); - return; - } - state_ = QUIESCING; - } - - std::lock_guard l(state_change_lock_); - // Even though Tablet::Shutdown() also unregisters its ops, we have to do it here - // to ensure that any currently running operation finishes before we proceed with - // the rest of the shutdown sequence. In particular, a maintenance operation could - // indirectly end up calling into the log, which we are about to shut down. - if (tablet_) tablet_->UnregisterMaintenanceOps(); - UnregisterMaintenanceOps(); - - if (consensus_) consensus_->Shutdown(); - - // TODO: KUDU-183: Keep track of the pending tasks and send an "abort" message. - LOG_SLOW_EXECUTION(WARNING, 1000, - Substitute("TabletPeer: tablet $0: Waiting for Transactions to complete", tablet_id())) { - txn_tracker_.WaitForAllToFinish(); - } - - if (prepare_pool_) { - prepare_pool_->Shutdown(); - } - - if (log_) { - WARN_NOT_OK(log_->Close(), "Error closing the Log."); - } - - if (VLOG_IS_ON(1)) { - VLOG(1) << "TabletPeer: tablet " << tablet_id() << " shut down!"; - } - - if (tablet_) { - tablet_->Shutdown(); - } - - // Only mark the peer as SHUTDOWN when all other components have shut down. - { - std::lock_guard lock(lock_); - // Release mem tracker resources. - consensus_.reset(); - tablet_.reset(); - state_ = SHUTDOWN; - } -} - -void TabletPeer::WaitUntilShutdown() { - while (true) { - { - std::lock_guard lock(lock_); - if (state_ == SHUTDOWN) { - return; - } - } - SleepFor(MonoDelta::FromMilliseconds(10)); - } -} - -Status TabletPeer::CheckRunning() const { - { - std::lock_guard lock(lock_); - if (state_ != RUNNING) { - return Status::IllegalState(Substitute("The tablet is not in a running state: $0", - TabletStatePB_Name(state_))); - } - } - return Status::OK(); -} - -Status TabletPeer::WaitUntilConsensusRunning(const MonoDelta& timeout) { - MonoTime start(MonoTime::Now()); - - int backoff_exp = 0; - const int kMaxBackoffExp = 8; - while (true) { - bool has_consensus = false; - TabletStatePB cached_state; - { - std::lock_guard lock(lock_); - cached_state = state_; - if (consensus_) { - has_consensus = true; // consensus_ is a set-once object. - } - } - if (cached_state == QUIESCING || cached_state == SHUTDOWN) { - return Status::IllegalState( - Substitute("The tablet is already shutting down or shutdown. State: $0", - TabletStatePB_Name(cached_state))); - } - if (cached_state == RUNNING && has_consensus && consensus_->IsRunning()) { - break; - } - MonoTime now(MonoTime::Now()); - MonoDelta elapsed(now - start); - if (elapsed > timeout) { - return Status::TimedOut(Substitute("Consensus is not running after waiting for $0. State; $1", - elapsed.ToString(), TabletStatePB_Name(cached_state))); - } - SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp)); - backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp); - } - return Status::OK(); -} - -Status TabletPeer::SubmitWrite(unique_ptr state) { - RETURN_NOT_OK(CheckRunning()); - - state->SetResultTracker(result_tracker_); - gscoped_ptr transaction(new WriteTransaction(std::move(state), - consensus::LEADER)); - scoped_refptr driver; - RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs(), - &driver)); - return driver->ExecuteAsync(); -} - -Status TabletPeer::SubmitAlterSchema(unique_ptr state) { - RETURN_NOT_OK(CheckRunning()); - - gscoped_ptr transaction( - new AlterSchemaTransaction(std::move(state), consensus::LEADER)); - scoped_refptr driver; - RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs(), &driver)); - return driver->ExecuteAsync(); -} - -void TabletPeer::GetTabletStatusPB(TabletStatusPB* status_pb_out) const { - std::lock_guard lock(lock_); - DCHECK(status_pb_out != nullptr); - status_pb_out->set_tablet_id(meta_->tablet_id()); - status_pb_out->set_table_name(meta_->table_name()); - status_pb_out->set_last_status(last_status_); - meta_->partition().ToPB(status_pb_out->mutable_partition()); - status_pb_out->set_state(state_); - status_pb_out->set_tablet_data_state(meta_->tablet_data_state()); - if (tablet_) { - status_pb_out->set_estimated_on_disk_size(tablet_->EstimateOnDiskSize()); - } -} - -Status TabletPeer::RunLogGC() { - if (!CheckRunning().ok()) { - return Status::OK(); - } - int32_t num_gced; - log::RetentionIndexes retention = GetRetentionIndexes(); - Status s = log_->GC(retention, &num_gced); - if (!s.ok()) { - s = s.CloneAndPrepend("Unexpected error while running Log GC from TabletPeer"); - LOG(ERROR) << s.ToString(); - } - return Status::OK(); -} - -void TabletPeer::StatusMessage(const std::string& status) { - std::lock_guard lock(lock_); - last_status_ = status; -} - -string TabletPeer::last_status() const { - std::lock_guard lock(lock_); - return last_status_; -} - -void TabletPeer::SetFailed(const Status& error) { - std::lock_guard lock(lock_); - CHECK(!error.ok()); - state_ = FAILED; - error_ = error; - last_status_ = error.ToString(); -} - -string TabletPeer::HumanReadableState() const { - std::lock_guard lock(lock_); - TabletDataState data_state = meta_->tablet_data_state(); - // If failed, any number of things could have gone wrong. - if (state_ == FAILED) { - return Substitute("$0 ($1): $2", TabletStatePB_Name(state_), - TabletDataState_Name(data_state), - error_.ToString()); - // If it's copying, or tombstoned, that is the important thing - // to show. - } else if (data_state != TABLET_DATA_READY) { - return TabletDataState_Name(data_state); - } - // Otherwise, the tablet's data is in a "normal" state, so we just display - // the runtime state (BOOTSTRAPPING, RUNNING, etc). - return TabletStatePB_Name(state_); -} - -void TabletPeer::GetInFlightTransactions(Transaction::TraceType trace_type, - vector* out) const { - vector > pending_transactions; - txn_tracker_.GetPendingTransactions(&pending_transactions); - for (const scoped_refptr& driver : pending_transactions) { - if (driver->state() != nullptr) { - consensus::TransactionStatusPB status_pb; - status_pb.mutable_op_id()->CopyFrom(driver->GetOpId()); - switch (driver->tx_type()) { - case Transaction::WRITE_TXN: - status_pb.set_tx_type(consensus::WRITE_OP); - break; - case Transaction::ALTER_SCHEMA_TXN: - status_pb.set_tx_type(consensus::ALTER_SCHEMA_OP); - break; - } - status_pb.set_description(driver->ToString()); - int64_t running_for_micros = - (MonoTime::Now() - driver->start_time()).ToMicroseconds(); - status_pb.set_running_for_micros(running_for_micros); - if (trace_type == Transaction::TRACE_TXNS) { - status_pb.set_trace_buffer(driver->trace()->DumpToString()); - } - out->push_back(status_pb); - } - } -} - -log::RetentionIndexes TabletPeer::GetRetentionIndexes() const { - // Let consensus set a minimum index that should be anchored. - // This ensures that we: - // (a) don't GC any operations which are still in flight - // (b) don't GC any operations that are needed to catch up lagging peers. - log::RetentionIndexes ret = consensus_->GetRetentionIndexes(); - - // If we never have written to the log, no need to proceed. - if (ret.for_durability == 0) return ret; - - // Next, we interrogate the anchor registry. - // Returns OK if minimum known, NotFound if no anchors are registered. - { - int64_t min_anchor_index; - Status s = log_anchor_registry_->GetEarliestRegisteredLogIndex(&min_anchor_index); - if (PREDICT_FALSE(!s.ok())) { - DCHECK(s.IsNotFound()) << "Unexpected error calling LogAnchorRegistry: " << s.ToString(); - } else { - ret.for_durability = std::min(ret.for_durability, min_anchor_index); - } - } - - // Next, interrogate the TransactionTracker. - vector > pending_transactions; - txn_tracker_.GetPendingTransactions(&pending_transactions); - for (const scoped_refptr& driver : pending_transactions) { - OpId tx_op_id = driver->GetOpId(); - // A transaction which doesn't have an opid hasn't been submitted for replication yet and - // thus has no need to anchor the log. - if (tx_op_id.IsInitialized()) { - ret.for_durability = std::min(ret.for_durability, tx_op_id.index()); - } - } - - return ret; -} - -Status TabletPeer::GetReplaySizeMap(map* replay_size_map) const { - RETURN_NOT_OK(CheckRunning()); - log_->GetReplaySizeMap(replay_size_map); - return Status::OK(); -} - -Status TabletPeer::GetGCableDataSize(int64_t* retention_size) const { - RETURN_NOT_OK(CheckRunning()); - *retention_size = log_->GetGCableDataSize(GetRetentionIndexes()); - return Status::OK(); -} - -Status TabletPeer::StartReplicaTransaction(const scoped_refptr& round) { - { - std::lock_guard lock(lock_); - if (state_ != RUNNING && state_ != BOOTSTRAPPING) { - return Status::IllegalState(TabletStatePB_Name(state_)); - } - } - - consensus::ReplicateMsg* replicate_msg = round->replicate_msg(); - DCHECK(replicate_msg->has_timestamp()); - gscoped_ptr transaction; - switch (replicate_msg->op_type()) { - case WRITE_OP: - { - DCHECK(replicate_msg->has_write_request()) << "WRITE_OP replica" - " transaction must receive a WriteRequestPB"; - unique_ptr tx_state( - new WriteTransactionState( - this, - &replicate_msg->write_request(), - replicate_msg->has_request_id() ? &replicate_msg->request_id() : nullptr)); - tx_state->SetResultTracker(result_tracker_); - - transaction.reset(new WriteTransaction(std::move(tx_state), consensus::REPLICA)); - break; - } - case ALTER_SCHEMA_OP: - { - DCHECK(replicate_msg->has_alter_schema_request()) << "ALTER_SCHEMA_OP replica" - " transaction must receive an AlterSchemaRequestPB"; - unique_ptr tx_state( - new AlterSchemaTransactionState(this, &replicate_msg->alter_schema_request(), - nullptr)); - transaction.reset( - new AlterSchemaTransaction(std::move(tx_state), consensus::REPLICA)); - break; - } - default: - LOG(FATAL) << "Unsupported Operation Type"; - } - - // TODO(todd) Look at wiring the stuff below on the driver - TransactionState* state = transaction->state(); - state->set_consensus_round(round); - - scoped_refptr driver; - RETURN_NOT_OK(NewReplicaTransactionDriver(std::move(transaction), &driver)); - - // Unretained is required to avoid a refcount cycle. - state->consensus_round()->SetConsensusReplicatedCallback( - Bind(&TransactionDriver::ReplicationFinished, Unretained(driver.get()))); - - RETURN_NOT_OK(driver->ExecuteAsync()); - return Status::OK(); -} - -Status TabletPeer::NewLeaderTransactionDriver(gscoped_ptr transaction, - scoped_refptr* driver) { - scoped_refptr tx_driver = new TransactionDriver( - &txn_tracker_, - consensus_.get(), - log_.get(), - prepare_pool_.get(), - apply_pool_, - &txn_order_verifier_); - RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::LEADER)); - driver->swap(tx_driver); - - return Status::OK(); -} - -Status TabletPeer::NewReplicaTransactionDriver(gscoped_ptr transaction, - scoped_refptr* driver) { - scoped_refptr tx_driver = new TransactionDriver( - &txn_tracker_, - consensus_.get(), - log_.get(), - prepare_pool_.get(), - apply_pool_, - &txn_order_verifier_); - RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::REPLICA)); - driver->swap(tx_driver); - - return Status::OK(); -} - -void TabletPeer::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) { - // Taking state_change_lock_ ensures that we don't shut down concurrently with - // this last start-up task. - std::lock_guard l(state_change_lock_); - - if (state() != RUNNING) { - LOG(WARNING) << "Not registering maintenance operations for " << tablet_ - << ": tablet not in RUNNING state"; - return; - } - - DCHECK(maintenance_ops_.empty()); - - gscoped_ptr mrs_flush_op(new FlushMRSOp(this)); - maint_mgr->RegisterOp(mrs_flush_op.get()); - maintenance_ops_.push_back(mrs_flush_op.release()); - - gscoped_ptr dms_flush_op(new FlushDeltaMemStoresOp(this)); - maint_mgr->RegisterOp(dms_flush_op.get()); - maintenance_ops_.push_back(dms_flush_op.release()); - - gscoped_ptr log_gc(new LogGCOp(this)); - maint_mgr->RegisterOp(log_gc.get()); - maintenance_ops_.push_back(log_gc.release()); - - tablet_->RegisterMaintenanceOps(maint_mgr); -} - -void TabletPeer::UnregisterMaintenanceOps() { - DCHECK(state_change_lock_.is_locked()); - for (MaintenanceOp* op : maintenance_ops_) { - op->Unregister(); - } - STLDeleteElements(&maintenance_ops_); -} - -Status FlushInflightsToLogCallback::WaitForInflightsAndFlushLog() { - // This callback is triggered prior to any TabletMetadata flush. - // The guarantee that we are trying to enforce is this: - // - // If an operation has been flushed to stable storage (eg a DRS or DeltaFile) - // then its COMMIT message must be present in the log. - // - // The purpose for this is so that, during bootstrap, we can accurately identify - // whether each operation has been flushed. If we don't see a COMMIT message for - // an operation, then we assume it was not completely applied and needs to be - // re-applied. Thus, if we had something on disk but with no COMMIT message, - // we'd attempt to double-apply the write, resulting in an error (eg trying to - // delete an already-deleted row). - // - // So, to enforce this property, we do two steps: - // - // 1) Wait for any operations which are already mid-Apply() to Commit() in MVCC. - // - // Because the operations always enqueue their COMMIT message to the log - // before calling Commit(), this ensures that any in-flight operations have - // their commit messages "en route". - // - // NOTE: we only wait for those operations that have started their Apply() phase. - // Any operations which haven't yet started applying haven't made any changes - // to in-memory state: thus, they obviously couldn't have made any changes to - // on-disk storage either (data can only get to the disk by going through an in-memory - // store). Only those that have started Apply() could have potentially written some - // data which is now on disk. - // - // Perhaps more importantly, if we waited on operations that hadn't started their - // Apply() phase, we might be waiting forever -- for example, if a follower has been - // partitioned from its leader, it may have operations sitting around in flight - // for quite a long time before eventually aborting or committing. This would - // end up blocking all flushes if we waited on it. - // - // 2) Flush the log - // - // This ensures that the above-mentioned commit messages are not just enqueued - // to the log, but also on disk. - VLOG(1) << "T " << tablet_->metadata()->tablet_id() - << ": Waiting for in-flight transactions to commit."; - LOG_SLOW_EXECUTION(WARNING, 200, "Committing in-flights took a long time.") { - tablet_->mvcc_manager()->WaitForApplyingTransactionsToCommit(); - } - VLOG(1) << "T " << tablet_->metadata()->tablet_id() - << ": Waiting for the log queue to be flushed."; - LOG_SLOW_EXECUTION(WARNING, 200, "Flushing the Log queue took a long time.") { - RETURN_NOT_OK(log_->WaitUntilAllFlushed()); - } - return Status::OK(); -} - - -} // namespace tablet -} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_peer.h b/src/kudu/tablet/tablet_peer.h deleted file mode 100644 index d622626..0000000 --- a/src/kudu/tablet/tablet_peer.h +++ /dev/null @@ -1,377 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef KUDU_TABLET_TABLET_PEER_H_ -#define KUDU_TABLET_TABLET_PEER_H_ - -#include -#include -#include -#include -#include - -#include "kudu/consensus/consensus.h" -#include "kudu/consensus/log.h" -#include "kudu/consensus/time_manager.h" -#include "kudu/gutil/callback.h" -#include "kudu/gutil/ref_counted.h" -#include "kudu/tablet/tablet.h" -#include "kudu/tablet/transaction_order_verifier.h" -#include "kudu/tablet/transactions/transaction_tracker.h" -#include "kudu/util/metrics.h" -#include "kudu/util/semaphore.h" - -namespace kudu { - -namespace log { -class LogAnchorRegistry; -} - -namespace rpc { -class Messenger; -class ResultTracker; -} - -namespace tserver { -class CatchUpServiceTest; -} - -class MaintenanceManager; -class MaintenanceOp; - -namespace tablet { -class LeaderTransactionDriver; -class ReplicaTransactionDriver; -class TabletPeer; -class TabletStatusPB; -class TabletStatusListener; -class TransactionDriver; - -// Interface by which various tablet-related processes can report back their status -// to TabletPeer without having to have a circular class dependency, and so that -// those other classes can be easily tested without constructing a TabletPeer. -class TabletStatusListener { - public: - virtual ~TabletStatusListener() {} - - virtual void StatusMessage(const std::string& status) = 0; -}; - -// A peer in a tablet consensus configuration, which coordinates writes to tablets. -// Each time Write() is called this class appends a new entry to a replicated -// state machine through a consensus algorithm, which makes sure that other -// peers see the same updates in the same order. In addition to this, this -// class also splits the work and coordinates multi-threaded execution. -class TabletPeer : public RefCountedThreadSafe, - public consensus::ReplicaTransactionFactory, - public TabletStatusListener { - public: - TabletPeer(const scoped_refptr& meta, - const consensus::RaftPeerPB& local_peer_pb, ThreadPool* apply_pool, - Callback mark_dirty_clbk); - - // Initializes the TabletPeer, namely creating the Log and initializing - // Consensus. - Status Init(const std::shared_ptr& tablet, - const scoped_refptr& clock, - const std::shared_ptr& messenger, - const scoped_refptr& result_tracker, - const scoped_refptr& log, - const scoped_refptr& metric_entity); - - // Starts the TabletPeer, making it available for Write()s. If this - // TabletPeer is part of a consensus configuration this will connect it to other peers - // in the consensus configuration. - Status Start(const consensus::ConsensusBootstrapInfo& info); - - // Shutdown this tablet peer. - // If a shutdown is already in progress, blocks until that shutdown is complete. - void Shutdown(); - - // Check that the tablet is in a RUNNING state. - Status CheckRunning() const; - - // Wait until the tablet is in a RUNNING state or if there's a timeout. - // TODO have a way to wait for any state? - Status WaitUntilConsensusRunning(const MonoDelta& timeout); - - // Submits a write to a tablet and executes it asynchronously. - // The caller is expected to build and pass a TrasactionContext that points - // to the RPC WriteRequest, WriteResponse, RpcContext and to the tablet's - // MvccManager. - Status SubmitWrite(std::unique_ptr tx_state); - - // Called by the tablet service to start an alter schema transaction. - // - // The transaction contains all the information required to execute the - // AlterSchema operation and send the response back. - // - // If the returned Status is OK, the response to the client will be sent - // asynchronously. Otherwise the tablet service will have to send the response directly. - // - // The AlterSchema operation is taking the tablet component lock in exclusive mode - // meaning that no other operation on the tablet can be executed while the - // AlterSchema is in progress. - Status SubmitAlterSchema(std::unique_ptr tx_state); - - void GetTabletStatusPB(TabletStatusPB* status_pb_out) const; - - // Used by consensus to create and start a new ReplicaTransaction. - virtual Status StartReplicaTransaction( - const scoped_refptr& round) OVERRIDE; - - consensus::Consensus* consensus() { - std::lock_guard lock(lock_); - return consensus_.get(); - } - - scoped_refptr shared_consensus() const { - std::lock_guard lock(lock_); - return consensus_; - } - - Tablet* tablet() const { - std::lock_guard lock(lock_); - return tablet_.get(); - } - - scoped_refptr time_manager() const { - return consensus_->time_manager(); - } - - std::shared_ptr shared_tablet() const { - std::lock_guard lock(lock_); - return tablet_; - } - - const TabletStatePB state() const { - std::lock_guard lock(lock_); - return state_; - } - - // Returns the current Raft configuration. - const consensus::RaftConfigPB RaftConfig() const; - - // If any peers in the consensus configuration lack permanent uuids, get them via an - // RPC call and update. - // TODO: move this to raft_consensus.h. - Status UpdatePermanentUuids(); - - // Sets the tablet to a BOOTSTRAPPING state, indicating it is starting up. - void SetBootstrapping() { - std::lock_guard lock(lock_); - CHECK_EQ(NOT_STARTED, state_); - state_ = BOOTSTRAPPING; - } - - // Implementation of TabletStatusListener::StatusMessage(). - void StatusMessage(const std::string& status) override; - - // Retrieve the last human-readable status of this tablet peer. - std::string last_status() const; - - // Sets the tablet state to FAILED additionally setting the error to the provided - // one. - void SetFailed(const Status& error); - - // Returns the error that occurred, when state is FAILED. - Status error() const { - std::lock_guard lock(lock_); - return error_; - } - - // Returns a human-readable string indicating the state of the tablet. - // Typically this looks like "NOT_STARTED", "TABLET_DATA_COPYING", - // etc. For use in places like the Web UI. - std::string HumanReadableState() const; - - // Adds list of transactions in-flight at the time of the call to 'out'. - void GetInFlightTransactions(Transaction::TraceType trace_type, - std::vector* out) const; - - // Returns the log indexes to be retained for durability and to catch up peers. - // Used for selection of log segments to delete during Log GC. - log::RetentionIndexes GetRetentionIndexes() const; - - // See Log::GetReplaySizeMap(...). - // - // Returns a non-ok status if the tablet isn't running. - Status GetReplaySizeMap(std::map* replay_size_map) const; - - // Returns the amount of bytes that would be GC'd if RunLogGC() was called. - // - // Returns a non-ok status if the tablet isn't running. - Status GetGCableDataSize(int64_t* retention_size) const; - - // Return a pointer to the Log. - // TabletPeer keeps a reference to Log after Init(). - log::Log* log() const { - return log_.get(); - } - - server::Clock* clock() { - return clock_.get(); - } - - const scoped_refptr& log_anchor_registry() const { - return log_anchor_registry_; - } - - // Returns the tablet_id of the tablet managed by this TabletPeer. - // Returns the correct tablet_id even if the underlying tablet is not available - // yet. - const std::string& tablet_id() const { return tablet_id_; } - - // Convenience method to return the permanent_uuid of this peer. - std::string permanent_uuid() const { return tablet_->metadata()->fs_manager()->uuid(); } - - Status NewLeaderTransactionDriver(gscoped_ptr transaction, - scoped_refptr* driver); - - Status NewReplicaTransactionDriver(gscoped_ptr transaction, - scoped_refptr* driver); - - // Tells the tablet's log to garbage collect. - Status RunLogGC(); - - // Register the maintenance ops associated with this peer's tablet, also invokes - // Tablet::RegisterMaintenanceOps(). - void RegisterMaintenanceOps(MaintenanceManager* maintenance_manager); - - // Unregister the maintenance ops associated with this peer's tablet. - // This method is not thread safe. - void UnregisterMaintenanceOps(); - - // Return pointer to the transaction tracker for this peer. - const TransactionTracker* transaction_tracker() const { return &txn_tracker_; } - - const scoped_refptr& tablet_metadata() const { - return meta_; - } - - // Marks the tablet as dirty so that it's included in the next heartbeat. - void MarkTabletDirty(const std::string& reason) { - mark_dirty_clbk_.Run(reason); - } - - private: - friend class RefCountedThreadSafe; - friend class TabletPeerTest; - FRIEND_TEST(TabletPeerTest, TestMRSAnchorPreventsLogGC); - FRIEND_TEST(TabletPeerTest, TestDMSAnchorPreventsLogGC); - FRIEND_TEST(TabletPeerTest, TestActiveTransactionPreventsLogGC); - - ~TabletPeer(); - - // Wait until the TabletPeer is fully in SHUTDOWN state. - void WaitUntilShutdown(); - - // After bootstrap is complete and consensus is setup this initiates the transactions - // that were not complete on bootstrap. - // Not implemented yet. See .cc file. - Status StartPendingTransactions(consensus::RaftPeerPB::Role my_role, - const consensus::ConsensusBootstrapInfo& bootstrap_info); - - const scoped_refptr meta_; - - const std::string tablet_id_; - - const consensus::RaftPeerPB local_peer_pb_; - - TabletStatePB state_; - Status error_; - TransactionTracker txn_tracker_; - TransactionOrderVerifier txn_order_verifier_; - scoped_refptr log_; - std::shared_ptr tablet_; - std::shared_ptr messenger_; - scoped_refptr consensus_; - simple_spinlock prepare_replicate_lock_; - - // Lock protecting state_, last_status_, as well as smart pointers to collaborating - // classes such as tablet_ and consensus_. - mutable simple_spinlock lock_; - - // The human-readable last status of the tablet, displayed on the web page, command line - // tools, etc. - std::string last_status_; - - // Lock taken during Init/Shutdown which ensures that only a single thread - // attempts to perform major lifecycle operations (Init/Shutdown) at once. - // This must be acquired before acquiring lock_ if they are acquired together. - // We don't just use lock_ since the lifecycle operations may take a while - // and we'd like other threads to be able to quickly poll the state_ variable - // during them in order to reject RPCs, etc. - mutable simple_spinlock state_change_lock_; - - // IMPORTANT: correct execution of PrepareTask assumes that 'prepare_pool_' - // is single-threaded, moving to a multi-tablet setup where multiple TabletPeers - // use the same 'prepare_pool_' needs to enforce that, for a single - // TabletPeer, PrepareTasks are executed *serially*. - // TODO move the prepare pool to TabletServer. - gscoped_ptr prepare_pool_; - - // Pool that executes apply tasks for transactions. This is a multi-threaded - // pool, constructor-injected by either the Master (for system tables) or - // the Tablet server. - ThreadPool* apply_pool_; - - scoped_refptr clock_; - - scoped_refptr log_anchor_registry_; - - // Function to mark this TabletPeer's tablet as dirty in the TSTabletManager. - // - // Must be called whenever cluster membership or leadership changes, or when - // the tablet's schema changes. - Callback mark_dirty_clbk_; - - // List of maintenance operations for the tablet that need information that only the peer - // can provide. - std::vector maintenance_ops_; - - // The result tracker for writes. - scoped_refptr result_tracker_; - - DISALLOW_COPY_AND_ASSIGN(TabletPeer); -}; - -// A callback to wait for the in-flight transactions to complete and to flush -// the Log when they do. -// Tablet is passed as a raw pointer as this callback is set in TabletMetadata and -// were we to keep the tablet as a shared_ptr a circular dependency would occur: -// callback->tablet->metadata->callback. Since the tablet indirectly owns this -// callback we know that is must still be alive when it fires. -class FlushInflightsToLogCallback : public RefCountedThreadSafe { - public: - FlushInflightsToLogCallback(Tablet* tablet, - const scoped_refptr& log) - : tablet_(tablet), - log_(log) {} - - Status WaitForInflightsAndFlushLog(); - - private: - Tablet* tablet_; - scoped_refptr log_; -}; - - -} // namespace tablet -} // namespace kudu - -#endif /* KUDU_TABLET_TABLET_PEER_H_ */ http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer_mm_ops.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_peer_mm_ops.cc b/src/kudu/tablet/tablet_peer_mm_ops.cc deleted file mode 100644 index 508d69f..0000000 --- a/src/kudu/tablet/tablet_peer_mm_ops.cc +++ /dev/null @@ -1,244 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "kudu/tablet/tablet_peer_mm_ops.h" - -#include -#include -#include -#include -#include - -#include "kudu/gutil/strings/substitute.h" -#include "kudu/tablet/tablet_metrics.h" -#include "kudu/util/flag_tags.h" -#include "kudu/util/maintenance_manager.h" -#include "kudu/util/metrics.h" - -DEFINE_int32(flush_threshold_mb, 1024, - "Size at which MemRowSet flushes are triggered. " - "A MRS can still flush below this threshold if it if hasn't flushed in a while, " - "or if the server-wide memory limit has been reached."); -TAG_FLAG(flush_threshold_mb, experimental); - -DEFINE_int32(flush_threshold_secs, 2 * 60, - "Number of seconds after which a non-empty MemRowSet will become flushable " - "even if it is not large."); -TAG_FLAG(flush_threshold_secs, experimental); - - -METRIC_DEFINE_gauge_uint32(tablet, log_gc_running, - "Log GCs Running", - kudu::MetricUnit::kOperations, - "Number of log GC operations currently running."); -METRIC_DEFINE_histogram(tablet, log_gc_duration, - "Log GC Duration", - kudu::MetricUnit::kMilliseconds, - "Time spent garbage collecting the logs.", 60000LU, 1); - -namespace kudu { -namespace tablet { - -using std::map; -using strings::Substitute; - -// Upper bound for how long it takes to reach "full perf improvement" in time-based flushing. -const double kFlushUpperBoundMs = 60 * 60 * 1000; - -// -// FlushOpPerfImprovementPolicy. -// - -void FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats* stats, - double elapsed_ms) { - if (stats->ram_anchored() > FLAGS_flush_threshold_mb * 1024 * 1024) { - // If we're over the user-specified flush threshold, then consider the perf - // improvement to be 1 for every extra MB. This produces perf_improvement results - // which are much higher than any compaction would produce, and means that, when - // there is an MRS over threshold, a flush will almost always be selected instead of - // a compaction. That's not necessarily a good thing, but in the absence of better - // heuristics, it will do for now. - double extra_mb = - static_cast(FLAGS_flush_threshold_mb - (stats->ram_anchored()) / (1024 * 1024)); - stats->set_perf_improvement(extra_mb); - } else if (elapsed_ms > FLAGS_flush_threshold_secs * 1000) { - // Even if we aren't over the threshold, consider flushing if we haven't flushed - // in a long time. But, don't give it a large perf_improvement score. We should - // only do this if we really don't have much else to do, and if we've already waited a bit. - // The following will give an improvement that's between 0.0 and 1.0, gradually growing - // as 'elapsed_ms' approaches 'kFlushUpperBoundMs'. - double perf = elapsed_ms / kFlushUpperBoundMs; - if (perf > 1.0) { - perf = 1.0; - } - stats->set_perf_improvement(perf); - } -} - -// -// FlushMRSOp. -// - -void FlushMRSOp::UpdateStats(MaintenanceOpStats* stats) { - std::lock_guard l(lock_); - - map replay_size_map; - if (tablet_peer_->tablet()->MemRowSetEmpty() || - !tablet_peer_->GetReplaySizeMap(&replay_size_map).ok()) { - return; - } - - { - std::unique_lock lock(tablet_peer_->tablet()->rowsets_flush_sem_, std::defer_lock); - stats->set_runnable(lock.try_lock()); - } - - stats->set_ram_anchored(tablet_peer_->tablet()->MemRowSetSize()); - stats->set_logs_retained_bytes( - tablet_peer_->tablet()->MemRowSetLogReplaySize(replay_size_map)); - - // TODO(todd): use workload statistics here to find out how "hot" the tablet has - // been in the last 5 minutes. - FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush( - stats, - time_since_flush_.elapsed().wall_millis()); -} - -bool FlushMRSOp::Prepare() { - // Try to acquire the rowsets_flush_sem_. If we can't, the Prepare step - // fails. This also implies that only one instance of FlushMRSOp can be - // running at once. - return tablet_peer_->tablet()->rowsets_flush_sem_.try_lock(); -} - -void FlushMRSOp::Perform() { - CHECK(!tablet_peer_->tablet()->rowsets_flush_sem_.try_lock()); - - KUDU_CHECK_OK_PREPEND(tablet_peer_->tablet()->FlushUnlocked(), - Substitute("FlushMRS failed on $0", tablet_peer_->tablet_id())); - - { - std::lock_guard l(lock_); - time_since_flush_.start(); - } - tablet_peer_->tablet()->rowsets_flush_sem_.unlock(); -} - -scoped_refptr FlushMRSOp::DurationHistogram() const { - return tablet_peer_->tablet()->metrics()->flush_mrs_duration; -} - -scoped_refptr > FlushMRSOp::RunningGauge() const { - return tablet_peer_->tablet()->metrics()->flush_mrs_running; -} - -// -// FlushDeltaMemStoresOp. -// - -void FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) { - std::lock_guard l(lock_); - int64_t dms_size; - int64_t retention_size; - map max_idx_to_replay_size; - if (tablet_peer_->tablet()->DeltaMemRowSetEmpty() || - !tablet_peer_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) { - return; - } - tablet_peer_->tablet()->GetInfoForBestDMSToFlush(max_idx_to_replay_size, - &dms_size, &retention_size); - - stats->set_ram_anchored(dms_size); - stats->set_runnable(true); - stats->set_logs_retained_bytes(retention_size); - - FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush( - stats, - time_since_flush_.elapsed().wall_millis()); -} - -void FlushDeltaMemStoresOp::Perform() { - map max_idx_to_replay_size; - if (!tablet_peer_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) { - LOG(WARNING) << "Won't flush deltas since tablet shutting down: " << tablet_peer_->tablet_id(); - return; - } - KUDU_CHECK_OK_PREPEND(tablet_peer_->tablet()->FlushDMSWithHighestRetention( - max_idx_to_replay_size), - Substitute("Failed to flush DMS on $0", - tablet_peer_->tablet()->tablet_id())); - { - std::lock_guard l(lock_); - time_since_flush_.start(); - } -} - -scoped_refptr FlushDeltaMemStoresOp::DurationHistogram() const { - return tablet_peer_->tablet()->metrics()->flush_dms_duration; -} - -scoped_refptr > FlushDeltaMemStoresOp::RunningGauge() const { - return tablet_peer_->tablet()->metrics()->flush_dms_running; -} - -// -// LogGCOp. -// - -LogGCOp::LogGCOp(TabletPeer* tablet_peer) - : MaintenanceOp(StringPrintf("LogGCOp(%s)", tablet_peer->tablet()->tablet_id().c_str()), - MaintenanceOp::LOW_IO_USAGE), - tablet_peer_(tablet_peer), - log_gc_duration_(METRIC_log_gc_duration.Instantiate( - tablet_peer->tablet()->GetMetricEntity())), - log_gc_running_(METRIC_log_gc_running.Instantiate( - tablet_peer->tablet()->GetMetricEntity(), 0)), - sem_(1) {} - -void LogGCOp::UpdateStats(MaintenanceOpStats* stats) { - int64_t retention_size; - - if (!tablet_peer_->GetGCableDataSize(&retention_size).ok()) { - return; - } - - stats->set_logs_retained_bytes(retention_size); - stats->set_runnable(sem_.GetValue() == 1); -} - -bool LogGCOp::Prepare() { - return sem_.try_lock(); -} - -void LogGCOp::Perform() { - CHECK(!sem_.try_lock()); - - tablet_peer_->RunLogGC(); - - sem_.unlock(); -} - -scoped_refptr LogGCOp::DurationHistogram() const { - return log_gc_duration_; -} - -scoped_refptr > LogGCOp::RunningGauge() const { - return log_gc_running_; -} - -} // namespace tablet -} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer_mm_ops.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_peer_mm_ops.h b/src/kudu/tablet/tablet_peer_mm_ops.h deleted file mode 100644 index a985802..0000000 --- a/src/kudu/tablet/tablet_peer_mm_ops.h +++ /dev/null @@ -1,133 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef KUDU_TABLET_TABLET_PEER_MM_OPS_H_ -#define KUDU_TABLET_TABLET_PEER_MM_OPS_H_ - -#include "kudu/tablet/tablet_peer.h" -#include "kudu/util/maintenance_manager.h" -#include "kudu/util/stopwatch.h" - -namespace kudu { - -class Histogram; -template -class AtomicGauge; - -namespace tablet { - -class FlushOpPerfImprovementPolicy { - public: - ~FlushOpPerfImprovementPolicy() {} - - // Sets the performance improvement based on the anchored ram if it's over the threshold, - // else it will set it based on how long it has been since the last flush. - static void SetPerfImprovementForFlush(MaintenanceOpStats* stats, double elapsed_ms); - - private: - FlushOpPerfImprovementPolicy() {} -}; - -// Maintenance op for MRS flush. Only one can happen at a time. -class FlushMRSOp : public MaintenanceOp { - public: - explicit FlushMRSOp(TabletPeer* tablet_peer) - : MaintenanceOp(StringPrintf("FlushMRSOp(%s)", tablet_peer->tablet()->tablet_id().c_str()), - MaintenanceOp::HIGH_IO_USAGE), - tablet_peer_(tablet_peer) { - time_since_flush_.start(); - } - - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - - virtual bool Prepare() OVERRIDE; - - virtual void Perform() OVERRIDE; - - virtual scoped_refptr DurationHistogram() const OVERRIDE; - - virtual scoped_refptr > RunningGauge() const OVERRIDE; - - private: - // Lock protecting time_since_flush_. - mutable simple_spinlock lock_; - Stopwatch time_since_flush_; - - TabletPeer *const tablet_peer_; -}; - -// Maintenance op for DMS flush. -// Reports stats for all the DMS this tablet contains but only flushes one in Perform(). -class FlushDeltaMemStoresOp : public MaintenanceOp { - public: - explicit FlushDeltaMemStoresOp(TabletPeer* tablet_peer) - : MaintenanceOp(StringPrintf("FlushDeltaMemStoresOp(%s)", - tablet_peer->tablet()->tablet_id().c_str()), - MaintenanceOp::HIGH_IO_USAGE), - tablet_peer_(tablet_peer) { - time_since_flush_.start(); - } - - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - - virtual bool Prepare() OVERRIDE { - return true; - } - - virtual void Perform() OVERRIDE; - - virtual scoped_refptr DurationHistogram() const OVERRIDE; - - virtual scoped_refptr > RunningGauge() const OVERRIDE; - - private: - // Lock protecting time_since_flush_ - mutable simple_spinlock lock_; - Stopwatch time_since_flush_; - - TabletPeer *const tablet_peer_; -}; - -// Maintenance task that runs log GC. Reports log retention that represents the amount of data -// that can be GC'd. -// -// Only one LogGC op can run at a time. -class LogGCOp : public MaintenanceOp { - public: - explicit LogGCOp(TabletPeer* tablet_peer); - - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - - virtual bool Prepare() OVERRIDE; - - virtual void Perform() OVERRIDE; - - virtual scoped_refptr DurationHistogram() const OVERRIDE; - - virtual scoped_refptr > RunningGauge() const OVERRIDE; - - private: - TabletPeer *const tablet_peer_; - scoped_refptr log_gc_duration_; - scoped_refptr > log_gc_running_; - mutable Semaphore sem_; -}; - -} // namespace tablet -} // namespace kudu - -#endif /* KUDU_TABLET_TABLET_PEER_MM_OPS_H_ */