kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [4/5] kudu git commit: Rename TabletPeer to TabletReplica
Date Thu, 04 May 2017 22:14:29 GMT
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer-test.cc b/src/kudu/tablet/tablet_peer-test.cc
deleted file mode 100644
index 95c7d0c..0000000
--- a/src/kudu/tablet/tablet_peer-test.cc
+++ /dev/null
@@ -1,564 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <glog/logging.h>
-#include <gtest/gtest.h>
-
-#include "kudu/common/partial_row.h"
-#include "kudu/common/timestamp.h"
-#include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol-test-util.h"
-#include "kudu/consensus/consensus_meta.h"
-#include "kudu/consensus/log.h"
-#include "kudu/consensus/log_reader.h"
-#include "kudu/consensus/log_util.h"
-#include "kudu/consensus/opid_util.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/rpc/messenger.h"
-#include "kudu/server/clock.h"
-#include "kudu/server/logical_clock.h"
-#include "kudu/tablet/transactions/transaction.h"
-#include "kudu/tablet/transactions/transaction_driver.h"
-#include "kudu/tablet/transactions/write_transaction.h"
-#include "kudu/tablet/tablet_peer.h"
-#include "kudu/tablet/tablet_peer_mm_ops.h"
-#include "kudu/tablet/tablet-test-util.h"
-#include "kudu/tserver/tserver.pb.h"
-#include "kudu/util/maintenance_manager.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/pb_util.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
-#include "kudu/util/threadpool.h"
-
-METRIC_DECLARE_entity(tablet);
-
-DECLARE_int32(flush_threshold_mb);
-
-namespace kudu {
-namespace tablet {
-
-using consensus::CommitMsg;
-using consensus::Consensus;
-using consensus::ConsensusBootstrapInfo;
-using consensus::ConsensusMetadata;
-using consensus::MakeOpId;
-using consensus::MinimumOpId;
-using consensus::OpId;
-using consensus::OpIdEquals;
-using consensus::RaftPeerPB;
-using consensus::WRITE_OP;
-using log::Log;
-using log::LogAnchorRegistry;
-using log::LogOptions;
-using rpc::Messenger;
-using server::Clock;
-using server::LogicalClock;
-using std::shared_ptr;
-using std::string;
-using std::unique_ptr;
-using strings::Substitute;
-using tserver::WriteRequestPB;
-using tserver::WriteResponsePB;
-
-static Schema GetTestSchema() {
-  return Schema({ ColumnSchema("key", INT32) }, 1);
-}
-
-class TabletPeerTest : public KuduTabletTest {
- public:
-  TabletPeerTest()
-    : KuduTabletTest(GetTestSchema()),
-      insert_counter_(0),
-      delete_counter_(0) {
-  }
-
-  virtual void SetUp() OVERRIDE {
-    KuduTabletTest::SetUp();
-
-    ASSERT_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
-
-    rpc::MessengerBuilder builder(CURRENT_TEST_NAME());
-    ASSERT_OK(builder.Build(&messenger_));
-
-    metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
-
-    RaftPeerPB config_peer;
-    config_peer.set_permanent_uuid(tablet()->metadata()->fs_manager()->uuid());
-    config_peer.mutable_last_known_addr()->set_host("0.0.0.0");
-    config_peer.mutable_last_known_addr()->set_port(0);
-    config_peer.set_member_type(RaftPeerPB::VOTER);
-
-    // "Bootstrap" and start the TabletPeer.
-    tablet_peer_.reset(
-      new TabletPeer(make_scoped_refptr(tablet()->metadata()),
-                     config_peer,
-                     apply_pool_.get(),
-                     Bind(&TabletPeerTest::TabletPeerStateChangedCallback,
-                          Unretained(this),
-                          tablet()->tablet_id())));
-
-    // Make TabletPeer use the same LogAnchorRegistry as the Tablet created by the harness.
-    // TODO: Refactor TabletHarness to allow taking a LogAnchorRegistry, while also providing
-    // TabletMetadata for consumption by TabletPeer before Tablet is instantiated.
-    tablet_peer_->log_anchor_registry_ = tablet()->log_anchor_registry_;
-
-    RaftConfigPB config;
-    config.add_peers()->CopyFrom(config_peer);
-    config.set_opid_index(consensus::kInvalidOpIdIndex);
-
-    unique_ptr<ConsensusMetadata> cmeta;
-    ASSERT_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(),
-                                        tablet()->tablet_id(),
-                                        tablet()->metadata()->fs_manager()->uuid(),
-                                        config,
-                                        consensus::kMinimumTerm, &cmeta));
-
-    scoped_refptr<Log> log;
-    ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
-                               *tablet()->schema(), tablet()->metadata()->schema_version(),
-                               metric_entity_.get(), &log));
-
-    tablet_peer_->SetBootstrapping();
-    ASSERT_OK(tablet_peer_->Init(tablet(),
-                                 clock(),
-                                 messenger_,
-                                 scoped_refptr<rpc::ResultTracker>(),
-                                 log,
-                                 metric_entity_));
-  }
-
-  Status StartPeer(const ConsensusBootstrapInfo& info) {
-    RETURN_NOT_OK(tablet_peer_->Start(info));
-    RETURN_NOT_OK(tablet_peer_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
-    return Status::OK();
-  }
-
-  void TabletPeerStateChangedCallback(const string& tablet_id, const string& reason) {
-    LOG(INFO) << "Tablet peer state changed for tablet " << tablet_id << ". Reason: " << reason;
-  }
-
-  virtual void TearDown() OVERRIDE {
-    tablet_peer_->Shutdown();
-    apply_pool_->Shutdown();
-    KuduTabletTest::TearDown();
-  }
-
- protected:
-  // Generate monotonic sequence of key column integers.
-  Status GenerateSequentialInsertRequest(WriteRequestPB* write_req) {
-    Schema schema(GetTestSchema());
-    write_req->set_tablet_id(tablet()->tablet_id());
-    CHECK_OK(SchemaToPB(schema, write_req->mutable_schema()));
-
-    KuduPartialRow row(&schema);
-    CHECK_OK(row.SetInt32("key", insert_counter_++));
-
-    RowOperationsPBEncoder enc(write_req->mutable_row_operations());
-    enc.Add(RowOperationsPB::INSERT, row);
-    return Status::OK();
-  }
-
-  // Generate monotonic sequence of deletions, starting with 0.
-  // Will assert if you try to delete more rows than you inserted.
-  Status GenerateSequentialDeleteRequest(WriteRequestPB* write_req) {
-    CHECK_LT(delete_counter_, insert_counter_);
-    Schema schema(GetTestSchema());
-    write_req->set_tablet_id(tablet()->tablet_id());
-    CHECK_OK(SchemaToPB(schema, write_req->mutable_schema()));
-
-    KuduPartialRow row(&schema);
-    CHECK_OK(row.SetInt32("key", delete_counter_++));
-
-    RowOperationsPBEncoder enc(write_req->mutable_row_operations());
-    enc.Add(RowOperationsPB::DELETE, row);
-    return Status::OK();
-  }
-
-  Status ExecuteWriteAndRollLog(TabletPeer* tablet_peer, const WriteRequestPB& req) {
-    gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
-    unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_peer,
-                                                                         &req,
-                                                                         nullptr, // No RequestIdPB
-                                                                         resp.get()));
-
-    CountDownLatch rpc_latch(1);
-    tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
-        new LatchTransactionCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
-
-    CHECK_OK(tablet_peer->SubmitWrite(std::move(tx_state)));
-    rpc_latch.Wait();
-    CHECK(!resp->has_error())
-        << "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
-
-    // Roll the log after each write.
-    // Usually the append thread does the roll and no additional sync is required. However in
-    // this test the thread that is appending is not the same thread that is rolling the log
-    // so we must make sure the Log's queue is flushed before we roll or we might have a race
-    // between the appender thread and the thread executing the test.
-    CHECK_OK(tablet_peer->log_->WaitUntilAllFlushed());
-    CHECK_OK(tablet_peer->log_->AllocateSegmentAndRollOver());
-    return Status::OK();
-  }
-
-  // Execute insert requests and roll log after each one.
-  Status ExecuteInsertsAndRollLogs(int num_inserts) {
-    for (int i = 0; i < num_inserts; i++) {
-      gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
-      RETURN_NOT_OK(GenerateSequentialInsertRequest(req.get()));
-      RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_peer_.get(), *req));
-    }
-
-    return Status::OK();
-  }
-
-  // Execute delete requests and roll log after each one.
-  Status ExecuteDeletesAndRollLogs(int num_deletes) {
-    for (int i = 0; i < num_deletes; i++) {
-      gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
-      CHECK_OK(GenerateSequentialDeleteRequest(req.get()));
-      CHECK_OK(ExecuteWriteAndRollLog(tablet_peer_.get(), *req));
-    }
-
-    return Status::OK();
-  }
-
-  void AssertNoLogAnchors() {
-    // Make sure that there are no registered anchors in the registry
-    CHECK_EQ(0, tablet_peer_->log_anchor_registry()->GetAnchorCountForTests());
-  }
-
-  // Assert that the Log GC() anchor is earlier than the latest OpId in the Log.
-  void AssertLogAnchorEarlierThanLogLatest() {
-    log::RetentionIndexes retention = tablet_peer_->GetRetentionIndexes();
-    OpId last_log_opid;
-    tablet_peer_->log_->GetLatestEntryOpId(&last_log_opid);
-    CHECK_LT(retention.for_durability, last_log_opid.index())
-      << "Expected valid log anchor, got earliest opid: " << retention.for_durability
-      << " (expected any value earlier than last log id: " << SecureShortDebugString(last_log_opid)
-      << ")";
-  }
-
-  // We disable automatic log GC. Don't leak those changes.
-  google::FlagSaver flag_saver_;
-
-  int32_t insert_counter_;
-  int32_t delete_counter_;
-  MetricRegistry metric_registry_;
-  scoped_refptr<MetricEntity> metric_entity_;
-  shared_ptr<Messenger> messenger_;
-  scoped_refptr<TabletPeer> tablet_peer_;
-  gscoped_ptr<ThreadPool> apply_pool_;
-};
-
-// A Transaction that waits on the apply_continue latch inside of Apply().
-class DelayedApplyTransaction : public WriteTransaction {
- public:
-  DelayedApplyTransaction(CountDownLatch* apply_started,
-                          CountDownLatch* apply_continue,
-                          unique_ptr<WriteTransactionState> state)
-      : WriteTransaction(std::move(state), consensus::LEADER),
-        apply_started_(DCHECK_NOTNULL(apply_started)),
-        apply_continue_(DCHECK_NOTNULL(apply_continue)) {
-  }
-
-  virtual Status Apply(gscoped_ptr<CommitMsg>* commit_msg) OVERRIDE {
-    apply_started_->CountDown();
-    LOG(INFO) << "Delaying apply...";
-    apply_continue_->Wait();
-    LOG(INFO) << "Apply proceeding";
-    return WriteTransaction::Apply(commit_msg);
-  }
-
- private:
-  CountDownLatch* apply_started_;
-  CountDownLatch* apply_continue_;
-  DISALLOW_COPY_AND_ASSIGN(DelayedApplyTransaction);
-};
-
-// Ensure that Log::GC() doesn't delete logs when the MRS has an anchor.
-TEST_F(TabletPeerTest, TestMRSAnchorPreventsLogGC) {
-  ConsensusBootstrapInfo info;
-  ASSERT_OK(StartPeer(info));
-
-  Log* log = tablet_peer_->log_.get();
-  int32_t num_gced;
-
-  AssertNoLogAnchors();
-
-  log::SegmentSequence segments;
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-
-  ASSERT_EQ(1, segments.size());
-  ASSERT_OK(ExecuteInsertsAndRollLogs(3));
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(4, segments.size());
-
-  AssertLogAnchorEarlierThanLogLatest();
-  ASSERT_GT(tablet_peer_->log_anchor_registry()->GetAnchorCountForTests(), 0);
-
-  // Ensure nothing gets deleted.
-  log::RetentionIndexes retention = tablet_peer_->GetRetentionIndexes();
-  ASSERT_OK(log->GC(retention, &num_gced));
-  ASSERT_EQ(0, num_gced) << "earliest needed: " << retention.for_durability;
-
-  // Flush MRS as needed to ensure that we don't have OpId anchors in the MRS.
-  tablet_peer_->tablet()->Flush();
-  AssertNoLogAnchors();
-
-  // The first two segments should be deleted.
-  // The last is anchored due to the commit in the last segment being the last
-  // OpId in the log.
-  retention = tablet_peer_->GetRetentionIndexes();
-  ASSERT_OK(log->GC(retention, &num_gced));
-  ASSERT_EQ(2, num_gced) << "earliest needed: " << retention.for_durability;
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(2, segments.size());
-}
-
-// Ensure that Log::GC() doesn't delete logs when the DMS has an anchor.
-TEST_F(TabletPeerTest, TestDMSAnchorPreventsLogGC) {
-  ConsensusBootstrapInfo info;
-  ASSERT_OK(StartPeer(info));
-
-  Log* log = tablet_peer_->log_.get();
-  int32_t num_gced;
-
-  AssertNoLogAnchors();
-
-  log::SegmentSequence segments;
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-
-  ASSERT_EQ(1, segments.size());
-  ASSERT_OK(ExecuteInsertsAndRollLogs(2));
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(3, segments.size());
-
-  // Flush MRS & GC log so the next mutation goes into a DMS.
-  ASSERT_OK(tablet_peer_->tablet()->Flush());
-  log::RetentionIndexes retention = tablet_peer_->GetRetentionIndexes();
-  ASSERT_OK(log->GC(retention, &num_gced));
-  // We will only GC 1, and have 1 left because the earliest needed OpId falls
-  // back to the latest OpId written to the Log if no anchors are set.
-  ASSERT_EQ(1, num_gced);
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(2, segments.size());
-  AssertNoLogAnchors();
-
-  OpId id;
-  log->GetLatestEntryOpId(&id);
-  LOG(INFO) << "Before: " << SecureShortDebugString(id);
-
-
-  // We currently have no anchors and the last operation in the log is 0.3
-  // Before the below was ExecuteDeletesAndRollLogs(1) but that was breaking
-  // what I think is a wrong assertion.
-  // I.e. since 0.4 is the last operation that we know is in memory 0.4 is the
-  // last anchor we expect _and_ it's the last op in the log.
-  // Only if we apply two operations is the last anchored operation and the
-  // last operation in the log different.
-
-  // Execute a mutation.
-  ASSERT_OK(ExecuteDeletesAndRollLogs(2));
-  AssertLogAnchorEarlierThanLogLatest();
-  ASSERT_GT(tablet_peer_->log_anchor_registry()->GetAnchorCountForTests(), 0);
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(4, segments.size());
-
-  // Execute another couple inserts, but Flush it so it doesn't anchor.
-  ASSERT_OK(ExecuteInsertsAndRollLogs(2));
-  ASSERT_OK(tablet_peer_->tablet()->Flush());
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(6, segments.size());
-
-  // Ensure the delta and last insert remain in the logs, anchored by the delta.
-  // Note that this will allow GC of the 2nd insert done above.
-  retention = tablet_peer_->GetRetentionIndexes();
-  ASSERT_OK(log->GC(retention, &num_gced));
-  ASSERT_EQ(1, num_gced);
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(5, segments.size());
-
-  // Flush DMS to release the anchor.
-  tablet_peer_->tablet()->FlushBiggestDMS();
-
-  // Verify no anchors after Flush().
-  AssertNoLogAnchors();
-
-  // We should only hang onto one segment due to no anchors.
-  // The last log OpId is the commit in the last segment, so it only anchors
-  // that segment, not the previous, because it's not the first OpId in the
-  // segment.
-  retention = tablet_peer_->GetRetentionIndexes();
-  ASSERT_OK(log->GC(retention, &num_gced));
-  ASSERT_EQ(3, num_gced);
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(2, segments.size());
-}
-
-// Ensure that Log::GC() doesn't compact logs with OpIds of active transactions.
-TEST_F(TabletPeerTest, TestActiveTransactionPreventsLogGC) {
-  ConsensusBootstrapInfo info;
-  ASSERT_OK(StartPeer(info));
-
-  Log* log = tablet_peer_->log_.get();
-  int32_t num_gced;
-
-  AssertNoLogAnchors();
-
-  log::SegmentSequence segments;
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-
-  ASSERT_EQ(1, segments.size());
-  ASSERT_OK(ExecuteInsertsAndRollLogs(4));
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(5, segments.size());
-
-  // Flush MRS as needed to ensure that we don't have OpId anchors in the MRS.
-  ASSERT_EQ(1, tablet_peer_->log_anchor_registry()->GetAnchorCountForTests());
-  tablet_peer_->tablet()->Flush();
-
-  // Verify no anchors after Flush().
-  AssertNoLogAnchors();
-
-  // Now create a long-lived Transaction that hangs during Apply().
-  // Allow other transactions to go through. Logs should be populated, but the
-  // long-lived Transaction should prevent the log from being deleted since it
-  // is in-flight.
-  CountDownLatch rpc_latch(1);
-  CountDownLatch apply_started(1);
-  CountDownLatch apply_continue(1);
-  gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
-  gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
-  {
-    // Long-running mutation.
-    ASSERT_OK(GenerateSequentialDeleteRequest(req.get()));
-    unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_peer_.get(),
-                                                                         req.get(),
-                                                                         nullptr, // No RequestIdPB
-                                                                         resp.get()));
-
-    tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
-        new LatchTransactionCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
-
-    gscoped_ptr<DelayedApplyTransaction> transaction(
-        new DelayedApplyTransaction(&apply_started,
-                                    &apply_continue,
-                                    std::move(tx_state)));
-
-    scoped_refptr<TransactionDriver> driver;
-    ASSERT_OK(tablet_peer_->NewLeaderTransactionDriver(transaction.PassAs<Transaction>(),
-                                                       &driver));
-
-    ASSERT_OK(driver->ExecuteAsync());
-    apply_started.Wait();
-    ASSERT_TRUE(driver->GetOpId().IsInitialized())
-      << "By the time a transaction is applied, it should have an Opid";
-    // The apply will hang until we CountDown() the continue latch.
-    // Now, roll the log. Below, we execute a few more insertions with rolling.
-    ASSERT_OK(log->AllocateSegmentAndRollOver());
-  }
-
-  ASSERT_EQ(1, tablet_peer_->txn_tracker_.GetNumPendingForTests());
-  // The log anchor is currently equal to the latest OpId written to the Log
-  // because we are delaying the Commit message with the CountDownLatch.
-
-  // GC the first four segments created by the inserts.
-  log::RetentionIndexes retention = tablet_peer_->GetRetentionIndexes();
-  ASSERT_OK(log->GC(retention, &num_gced));
-  ASSERT_EQ(4, num_gced);
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(2, segments.size());
-
-  // We use mutations here, since an MRS Flush() quiesces the tablet, and we
-  // want to ensure the only thing "anchoring" is the TransactionTracker.
-  ASSERT_OK(ExecuteDeletesAndRollLogs(3));
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(5, segments.size());
-  ASSERT_EQ(1, tablet_peer_->log_anchor_registry()->GetAnchorCountForTests());
-  tablet_peer_->tablet()->FlushBiggestDMS();
-  ASSERT_EQ(0, tablet_peer_->log_anchor_registry()->GetAnchorCountForTests());
-  ASSERT_EQ(1, tablet_peer_->txn_tracker_.GetNumPendingForTests());
-
-  AssertLogAnchorEarlierThanLogLatest();
-
-  // Try to GC(), nothing should be deleted due to the in-flight transaction.
-  retention = tablet_peer_->GetRetentionIndexes();
-  ASSERT_OK(log->GC(retention, &num_gced));
-  ASSERT_EQ(0, num_gced);
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(5, segments.size());
-
-  // Now we release the transaction and wait for everything to complete.
-  // We fully quiesce and flush, which should release all anchors.
-  ASSERT_EQ(1, tablet_peer_->txn_tracker_.GetNumPendingForTests());
-  apply_continue.CountDown();
-  rpc_latch.Wait();
-  tablet_peer_->txn_tracker_.WaitForAllToFinish();
-  ASSERT_EQ(0, tablet_peer_->txn_tracker_.GetNumPendingForTests());
-  tablet_peer_->tablet()->FlushBiggestDMS();
-  AssertNoLogAnchors();
-
-  // All should be deleted except the two last segments.
-  retention = tablet_peer_->GetRetentionIndexes();
-  ASSERT_OK(log->GC(retention, &num_gced));
-  ASSERT_EQ(3, num_gced);
-  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(2, segments.size());
-}
-
-TEST_F(TabletPeerTest, TestGCEmptyLog) {
-  ConsensusBootstrapInfo info;
-  tablet_peer_->Start(info);
-  // We don't wait on consensus on purpose.
-  ASSERT_OK(tablet_peer_->RunLogGC());
-}
-
-TEST_F(TabletPeerTest, TestFlushOpsPerfImprovements) {
-  FLAGS_flush_threshold_mb = 64;
-
-  MaintenanceOpStats stats;
-
-  // Just on the threshold and not enough time has passed for a time-based flush.
-  stats.set_ram_anchored(64 * 1024 * 1024);
-  FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1);
-  ASSERT_EQ(0.0, stats.perf_improvement());
-  stats.Clear();
-
-  // Just on the threshold and enough time has passed, we'll have a low improvement.
-  stats.set_ram_anchored(64 * 1024 * 1024);
-  FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 3 * 60 * 1000);
-  ASSERT_GT(stats.perf_improvement(), 0.01);
-  stats.Clear();
-
-  // Way over the threshold, number is much higher than 1.
-  stats.set_ram_anchored(128 * 1024 * 1024);
-  FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1);
-  ASSERT_LT(1.0, stats.perf_improvement());
-  stats.Clear();
-
-  // Below the threshold but have been there a long time, closing in to 1.0.
-  stats.set_ram_anchored(30 * 1024 * 1024);
-  FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 60 * 50 * 1000);
-  ASSERT_LT(0.7, stats.perf_improvement());
-  ASSERT_GT(1.0, stats.perf_improvement());
-  stats.Clear();
-}
-
-} // namespace tablet
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.cc b/src/kudu/tablet/tablet_peer.cc
deleted file mode 100644
index d9bddb4..0000000
--- a/src/kudu/tablet/tablet_peer.cc
+++ /dev/null
@@ -1,672 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/tablet/tablet_peer.h"
-
-#include <algorithm>
-#include <gflags/gflags.h>
-#include <memory>
-#include <mutex>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "kudu/consensus/consensus.h"
-#include "kudu/consensus/consensus_meta.h"
-#include "kudu/consensus/log.h"
-#include "kudu/consensus/log_util.h"
-#include "kudu/consensus/opid_util.h"
-#include "kudu/consensus/log_anchor_registry.h"
-#include "kudu/consensus/quorum_util.h"
-#include "kudu/consensus/raft_consensus.h"
-#include "kudu/gutil/mathlimits.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/sysinfo.h"
-#include "kudu/rpc/messenger.h"
-#include "kudu/rpc/remote_method.h"
-#include "kudu/rpc/rpc_service.h"
-#include "kudu/rpc/service_pool.h"
-#include "kudu/tablet/transactions/transaction_driver.h"
-#include "kudu/tablet/transactions/alter_schema_transaction.h"
-#include "kudu/tablet/transactions/write_transaction.h"
-#include "kudu/tablet/tablet_bootstrap.h"
-#include "kudu/tablet/tablet_metrics.h"
-#include "kudu/tablet/tablet_peer_mm_ops.h"
-#include "kudu/tablet/tablet.pb.h"
-#include "kudu/util/logging.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/pb_util.h"
-#include "kudu/util/stopwatch.h"
-#include "kudu/util/threadpool.h"
-#include "kudu/util/trace.h"
-
-using std::map;
-using std::shared_ptr;
-using std::unique_ptr;
-
-namespace kudu {
-namespace tablet {
-
-METRIC_DEFINE_histogram(tablet, op_prepare_queue_length, "Operation Prepare Queue Length",
-                        MetricUnit::kTasks,
-                        "Number of operations waiting to be prepared within this tablet. "
-                        "High queue lengths indicate that the server is unable to process "
-                        "operations as fast as they are being written to the WAL.",
-                        10000, 2);
-
-METRIC_DEFINE_histogram(tablet, op_prepare_queue_time, "Operation Prepare Queue Time",
-                        MetricUnit::kMicroseconds,
-                        "Time that operations spent waiting in the prepare queue before being "
-                        "processed. High queue times indicate that the server is unable to "
-                        "process operations as fast as they are being written to the WAL.",
-                        10000000, 2);
-
-METRIC_DEFINE_histogram(tablet, op_prepare_run_time, "Operation Prepare Run Time",
-                        MetricUnit::kMicroseconds,
-                        "Time that operations spent being prepared in the tablet. "
-                        "High values may indicate that the server is under-provisioned or "
-                        "that operations are experiencing high contention with one another for "
-                        "locks.",
-                        10000000, 2);
-
-using consensus::Consensus;
-using consensus::ConsensusBootstrapInfo;
-using consensus::ConsensusMetadata;
-using consensus::ConsensusOptions;
-using consensus::ConsensusRound;
-using consensus::OpId;
-using consensus::RaftConfigPB;
-using consensus::RaftPeerPB;
-using consensus::RaftConsensus;
-using consensus::TimeManager;
-using consensus::ALTER_SCHEMA_OP;
-using consensus::WRITE_OP;
-using log::Log;
-using log::LogAnchorRegistry;
-using rpc::Messenger;
-using rpc::ResultTracker;
-using strings::Substitute;
-using tserver::TabletServerErrorPB;
-
-// ============================================================================
-//  Tablet Peer
-// ============================================================================
-TabletPeer::TabletPeer(const scoped_refptr<TabletMetadata>& meta,
-                       const consensus::RaftPeerPB& local_peer_pb,
-                       ThreadPool* apply_pool,
-                       Callback<void(const std::string& reason)> mark_dirty_clbk)
-    : meta_(meta),
-      tablet_id_(meta->tablet_id()),
-      local_peer_pb_(local_peer_pb),
-      state_(NOT_STARTED),
-      last_status_("Tablet initializing..."),
-      apply_pool_(apply_pool),
-      log_anchor_registry_(new LogAnchorRegistry()),
-      mark_dirty_clbk_(std::move(mark_dirty_clbk)) {}
-
-TabletPeer::~TabletPeer() {
-  std::lock_guard<simple_spinlock> lock(lock_);
-  // We should either have called Shutdown(), or we should have never called
-  // Init().
-  CHECK(!tablet_)
-      << "TabletPeer not fully shut down. State: "
-      << TabletStatePB_Name(state_);
-}
-
-Status TabletPeer::Init(const shared_ptr<Tablet>& tablet,
-                        const scoped_refptr<server::Clock>& clock,
-                        const shared_ptr<Messenger>& messenger,
-                        const scoped_refptr<ResultTracker>& result_tracker,
-                        const scoped_refptr<Log>& log,
-                        const scoped_refptr<MetricEntity>& metric_entity) {
-
-  DCHECK(tablet) << "A TabletPeer must be provided with a Tablet";
-  DCHECK(log) << "A TabletPeer must be provided with a Log";
-
-  RETURN_NOT_OK(ThreadPoolBuilder("prepare").set_max_threads(1).Build(&prepare_pool_));
-  prepare_pool_->SetQueueLengthHistogram(
-      METRIC_op_prepare_queue_length.Instantiate(metric_entity));
-  prepare_pool_->SetQueueTimeMicrosHistogram(
-      METRIC_op_prepare_queue_time.Instantiate(metric_entity));
-  prepare_pool_->SetRunTimeMicrosHistogram(
-      METRIC_op_prepare_run_time.Instantiate(metric_entity));
-
-  {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    CHECK_EQ(BOOTSTRAPPING, state_);
-    tablet_ = tablet;
-    clock_ = clock;
-    messenger_ = messenger;
-    log_ = log;
-    result_tracker_ = result_tracker;
-
-    ConsensusOptions options;
-    options.tablet_id = meta_->tablet_id();
-
-    TRACE("Creating consensus instance");
-
-    unique_ptr<ConsensusMetadata> cmeta;
-    RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_,
-                                          meta_->fs_manager()->uuid(), &cmeta));
-
-    scoped_refptr<TimeManager> time_manager(new TimeManager(
-        clock, tablet_->mvcc_manager()->GetCleanTimestamp()));
-
-    consensus_ = RaftConsensus::Create(options,
-                                       std::move(cmeta),
-                                       local_peer_pb_,
-                                       metric_entity,
-                                       time_manager,
-                                       this,
-                                       messenger_,
-                                       log_.get(),
-                                       tablet_->mem_tracker(),
-                                       mark_dirty_clbk_);
-  }
-
-  if (tablet_->metrics() != nullptr) {
-    TRACE("Starting instrumentation");
-    txn_tracker_.StartInstrumentation(tablet_->GetMetricEntity());
-  }
-  txn_tracker_.StartMemoryTracking(tablet_->mem_tracker());
-
-  TRACE("TabletPeer::Init() finished");
-  VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer Initted";
-  return Status::OK();
-}
-
-Status TabletPeer::Start(const ConsensusBootstrapInfo& bootstrap_info) {
-  std::lock_guard<simple_spinlock> l(state_change_lock_);
-  TRACE("Starting consensus");
-
-  VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer starting";
-
-  VLOG(2) << "RaftConfig before starting: " << SecureDebugString(consensus_->CommittedConfig());
-
-  RETURN_NOT_OK(consensus_->Start(bootstrap_info));
-  {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    CHECK_EQ(state_, BOOTSTRAPPING);
-    state_ = RUNNING;
-  }
-
-  // Because we changed the tablet state, we need to re-report the tablet to the master.
-  mark_dirty_clbk_.Run("Started TabletPeer");
-
-  return Status::OK();
-}
-
-const consensus::RaftConfigPB TabletPeer::RaftConfig() const {
-  CHECK(consensus_) << "consensus is null";
-  return consensus_->CommittedConfig();
-}
-
-void TabletPeer::Shutdown() {
-
-  LOG(INFO) << "Initiating TabletPeer shutdown for tablet: " << tablet_id_;
-
-  {
-    std::unique_lock<simple_spinlock> lock(lock_);
-    if (state_ == QUIESCING || state_ == SHUTDOWN) {
-      lock.unlock();
-      WaitUntilShutdown();
-      return;
-    }
-    state_ = QUIESCING;
-  }
-
-  std::lock_guard<simple_spinlock> l(state_change_lock_);
-  // Even though Tablet::Shutdown() also unregisters its ops, we have to do it here
-  // to ensure that any currently running operation finishes before we proceed with
-  // the rest of the shutdown sequence. In particular, a maintenance operation could
-  // indirectly end up calling into the log, which we are about to shut down.
-  if (tablet_) tablet_->UnregisterMaintenanceOps();
-  UnregisterMaintenanceOps();
-
-  if (consensus_) consensus_->Shutdown();
-
-  // TODO: KUDU-183: Keep track of the pending tasks and send an "abort" message.
-  LOG_SLOW_EXECUTION(WARNING, 1000,
-      Substitute("TabletPeer: tablet $0: Waiting for Transactions to complete", tablet_id())) {
-    txn_tracker_.WaitForAllToFinish();
-  }
-
-  if (prepare_pool_) {
-    prepare_pool_->Shutdown();
-  }
-
-  if (log_) {
-    WARN_NOT_OK(log_->Close(), "Error closing the Log.");
-  }
-
-  if (VLOG_IS_ON(1)) {
-    VLOG(1) << "TabletPeer: tablet " << tablet_id() << " shut down!";
-  }
-
-  if (tablet_) {
-    tablet_->Shutdown();
-  }
-
-  // Only mark the peer as SHUTDOWN when all other components have shut down.
-  {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    // Release mem tracker resources.
-    consensus_.reset();
-    tablet_.reset();
-    state_ = SHUTDOWN;
-  }
-}
-
-void TabletPeer::WaitUntilShutdown() {
-  while (true) {
-    {
-      std::lock_guard<simple_spinlock> lock(lock_);
-      if (state_ == SHUTDOWN) {
-        return;
-      }
-    }
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-}
-
-Status TabletPeer::CheckRunning() const {
-  {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    if (state_ != RUNNING) {
-      return Status::IllegalState(Substitute("The tablet is not in a running state: $0",
-                                             TabletStatePB_Name(state_)));
-    }
-  }
-  return Status::OK();
-}
-
-Status TabletPeer::WaitUntilConsensusRunning(const MonoDelta& timeout) {
-  MonoTime start(MonoTime::Now());
-
-  int backoff_exp = 0;
-  const int kMaxBackoffExp = 8;
-  while (true) {
-    bool has_consensus = false;
-    TabletStatePB cached_state;
-    {
-      std::lock_guard<simple_spinlock> lock(lock_);
-      cached_state = state_;
-      if (consensus_) {
-        has_consensus = true; // consensus_ is a set-once object.
-      }
-    }
-    if (cached_state == QUIESCING || cached_state == SHUTDOWN) {
-      return Status::IllegalState(
-          Substitute("The tablet is already shutting down or shutdown. State: $0",
-                     TabletStatePB_Name(cached_state)));
-    }
-    if (cached_state == RUNNING && has_consensus && consensus_->IsRunning()) {
-      break;
-    }
-    MonoTime now(MonoTime::Now());
-    MonoDelta elapsed(now - start);
-    if (elapsed > timeout) {
-      return Status::TimedOut(Substitute("Consensus is not running after waiting for $0. State; $1",
-                                         elapsed.ToString(), TabletStatePB_Name(cached_state)));
-    }
-    SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
-    backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp);
-  }
-  return Status::OK();
-}
-
-Status TabletPeer::SubmitWrite(unique_ptr<WriteTransactionState> state) {
-  RETURN_NOT_OK(CheckRunning());
-
-  state->SetResultTracker(result_tracker_);
-  gscoped_ptr<WriteTransaction> transaction(new WriteTransaction(std::move(state),
-                                                                 consensus::LEADER));
-  scoped_refptr<TransactionDriver> driver;
-  RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(),
-                                           &driver));
-  return driver->ExecuteAsync();
-}
-
-Status TabletPeer::SubmitAlterSchema(unique_ptr<AlterSchemaTransactionState> state) {
-  RETURN_NOT_OK(CheckRunning());
-
-  gscoped_ptr<AlterSchemaTransaction> transaction(
-      new AlterSchemaTransaction(std::move(state), consensus::LEADER));
-  scoped_refptr<TransactionDriver> driver;
-  RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(), &driver));
-  return driver->ExecuteAsync();
-}
-
-void TabletPeer::GetTabletStatusPB(TabletStatusPB* status_pb_out) const {
-  std::lock_guard<simple_spinlock> lock(lock_);
-  DCHECK(status_pb_out != nullptr);
-  status_pb_out->set_tablet_id(meta_->tablet_id());
-  status_pb_out->set_table_name(meta_->table_name());
-  status_pb_out->set_last_status(last_status_);
-  meta_->partition().ToPB(status_pb_out->mutable_partition());
-  status_pb_out->set_state(state_);
-  status_pb_out->set_tablet_data_state(meta_->tablet_data_state());
-  if (tablet_) {
-    status_pb_out->set_estimated_on_disk_size(tablet_->EstimateOnDiskSize());
-  }
-}
-
-Status TabletPeer::RunLogGC() {
-  if (!CheckRunning().ok()) {
-    return Status::OK();
-  }
-  int32_t num_gced;
-  log::RetentionIndexes retention = GetRetentionIndexes();
-  Status s = log_->GC(retention, &num_gced);
-  if (!s.ok()) {
-    s = s.CloneAndPrepend("Unexpected error while running Log GC from TabletPeer");
-    LOG(ERROR) << s.ToString();
-  }
-  return Status::OK();
-}
-
-void TabletPeer::StatusMessage(const std::string& status) {
-  std::lock_guard<simple_spinlock> lock(lock_);
-  last_status_ = status;
-}
-
-string TabletPeer::last_status() const {
-  std::lock_guard<simple_spinlock> lock(lock_);
-  return last_status_;
-}
-
-void TabletPeer::SetFailed(const Status& error) {
-  std::lock_guard<simple_spinlock> lock(lock_);
-  CHECK(!error.ok());
-  state_ = FAILED;
-  error_ = error;
-  last_status_ = error.ToString();
-}
-
-string TabletPeer::HumanReadableState() const {
-  std::lock_guard<simple_spinlock> lock(lock_);
-  TabletDataState data_state = meta_->tablet_data_state();
-  // If failed, any number of things could have gone wrong.
-  if (state_ == FAILED) {
-    return Substitute("$0 ($1): $2", TabletStatePB_Name(state_),
-                      TabletDataState_Name(data_state),
-                      error_.ToString());
-  // If it's copying, or tombstoned, that is the important thing
-  // to show.
-  } else if (data_state != TABLET_DATA_READY) {
-    return TabletDataState_Name(data_state);
-  }
-  // Otherwise, the tablet's data is in a "normal" state, so we just display
-  // the runtime state (BOOTSTRAPPING, RUNNING, etc).
-  return TabletStatePB_Name(state_);
-}
-
-void TabletPeer::GetInFlightTransactions(Transaction::TraceType trace_type,
-                                         vector<consensus::TransactionStatusPB>* out) const {
-  vector<scoped_refptr<TransactionDriver> > pending_transactions;
-  txn_tracker_.GetPendingTransactions(&pending_transactions);
-  for (const scoped_refptr<TransactionDriver>& driver : pending_transactions) {
-    if (driver->state() != nullptr) {
-      consensus::TransactionStatusPB status_pb;
-      status_pb.mutable_op_id()->CopyFrom(driver->GetOpId());
-      switch (driver->tx_type()) {
-        case Transaction::WRITE_TXN:
-          status_pb.set_tx_type(consensus::WRITE_OP);
-          break;
-        case Transaction::ALTER_SCHEMA_TXN:
-          status_pb.set_tx_type(consensus::ALTER_SCHEMA_OP);
-          break;
-      }
-      status_pb.set_description(driver->ToString());
-      int64_t running_for_micros =
-          (MonoTime::Now() - driver->start_time()).ToMicroseconds();
-      status_pb.set_running_for_micros(running_for_micros);
-      if (trace_type == Transaction::TRACE_TXNS) {
-        status_pb.set_trace_buffer(driver->trace()->DumpToString());
-      }
-      out->push_back(status_pb);
-    }
-  }
-}
-
-log::RetentionIndexes TabletPeer::GetRetentionIndexes() const {
-  // Let consensus set a minimum index that should be anchored.
-  // This ensures that we:
-  //   (a) don't GC any operations which are still in flight
-  //   (b) don't GC any operations that are needed to catch up lagging peers.
-  log::RetentionIndexes ret = consensus_->GetRetentionIndexes();
-
-  // If we never have written to the log, no need to proceed.
-  if (ret.for_durability == 0) return ret;
-
-  // Next, we interrogate the anchor registry.
-  // Returns OK if minimum known, NotFound if no anchors are registered.
-  {
-    int64_t min_anchor_index;
-    Status s = log_anchor_registry_->GetEarliestRegisteredLogIndex(&min_anchor_index);
-    if (PREDICT_FALSE(!s.ok())) {
-      DCHECK(s.IsNotFound()) << "Unexpected error calling LogAnchorRegistry: " << s.ToString();
-    } else {
-      ret.for_durability = std::min(ret.for_durability, min_anchor_index);
-    }
-  }
-
-  // Next, interrogate the TransactionTracker.
-  vector<scoped_refptr<TransactionDriver> > pending_transactions;
-  txn_tracker_.GetPendingTransactions(&pending_transactions);
-  for (const scoped_refptr<TransactionDriver>& driver : pending_transactions) {
-    OpId tx_op_id = driver->GetOpId();
-    // A transaction which doesn't have an opid hasn't been submitted for replication yet and
-    // thus has no need to anchor the log.
-    if (tx_op_id.IsInitialized()) {
-      ret.for_durability = std::min(ret.for_durability, tx_op_id.index());
-    }
-  }
-
-  return ret;
-}
-
-Status TabletPeer::GetReplaySizeMap(map<int64_t, int64_t>* replay_size_map) const {
-  RETURN_NOT_OK(CheckRunning());
-  log_->GetReplaySizeMap(replay_size_map);
-  return Status::OK();
-}
-
-Status TabletPeer::GetGCableDataSize(int64_t* retention_size) const {
-  RETURN_NOT_OK(CheckRunning());
-  *retention_size = log_->GetGCableDataSize(GetRetentionIndexes());
-  return Status::OK();
-}
-
-Status TabletPeer::StartReplicaTransaction(const scoped_refptr<ConsensusRound>& round) {
-  {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    if (state_ != RUNNING && state_ != BOOTSTRAPPING) {
-      return Status::IllegalState(TabletStatePB_Name(state_));
-    }
-  }
-
-  consensus::ReplicateMsg* replicate_msg = round->replicate_msg();
-  DCHECK(replicate_msg->has_timestamp());
-  gscoped_ptr<Transaction> transaction;
-  switch (replicate_msg->op_type()) {
-    case WRITE_OP:
-    {
-      DCHECK(replicate_msg->has_write_request()) << "WRITE_OP replica"
-          " transaction must receive a WriteRequestPB";
-      unique_ptr<WriteTransactionState> tx_state(
-          new WriteTransactionState(
-              this,
-              &replicate_msg->write_request(),
-              replicate_msg->has_request_id() ? &replicate_msg->request_id() : nullptr));
-      tx_state->SetResultTracker(result_tracker_);
-
-      transaction.reset(new WriteTransaction(std::move(tx_state), consensus::REPLICA));
-      break;
-    }
-    case ALTER_SCHEMA_OP:
-    {
-      DCHECK(replicate_msg->has_alter_schema_request()) << "ALTER_SCHEMA_OP replica"
-          " transaction must receive an AlterSchemaRequestPB";
-      unique_ptr<AlterSchemaTransactionState> tx_state(
-          new AlterSchemaTransactionState(this, &replicate_msg->alter_schema_request(),
-                                          nullptr));
-      transaction.reset(
-          new AlterSchemaTransaction(std::move(tx_state), consensus::REPLICA));
-      break;
-    }
-    default:
-      LOG(FATAL) << "Unsupported Operation Type";
-  }
-
-  // TODO(todd) Look at wiring the stuff below on the driver
-  TransactionState* state = transaction->state();
-  state->set_consensus_round(round);
-
-  scoped_refptr<TransactionDriver> driver;
-  RETURN_NOT_OK(NewReplicaTransactionDriver(std::move(transaction), &driver));
-
-  // Unretained is required to avoid a refcount cycle.
-  state->consensus_round()->SetConsensusReplicatedCallback(
-      Bind(&TransactionDriver::ReplicationFinished, Unretained(driver.get())));
-
-  RETURN_NOT_OK(driver->ExecuteAsync());
-  return Status::OK();
-}
-
-Status TabletPeer::NewLeaderTransactionDriver(gscoped_ptr<Transaction> transaction,
-                                              scoped_refptr<TransactionDriver>* driver) {
-  scoped_refptr<TransactionDriver> tx_driver = new TransactionDriver(
-    &txn_tracker_,
-    consensus_.get(),
-    log_.get(),
-    prepare_pool_.get(),
-    apply_pool_,
-    &txn_order_verifier_);
-  RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::LEADER));
-  driver->swap(tx_driver);
-
-  return Status::OK();
-}
-
-Status TabletPeer::NewReplicaTransactionDriver(gscoped_ptr<Transaction> transaction,
-                                               scoped_refptr<TransactionDriver>* driver) {
-  scoped_refptr<TransactionDriver> tx_driver = new TransactionDriver(
-    &txn_tracker_,
-    consensus_.get(),
-    log_.get(),
-    prepare_pool_.get(),
-    apply_pool_,
-    &txn_order_verifier_);
-  RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::REPLICA));
-  driver->swap(tx_driver);
-
-  return Status::OK();
-}
-
-void TabletPeer::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
-  // Taking state_change_lock_ ensures that we don't shut down concurrently with
-  // this last start-up task.
-  std::lock_guard<simple_spinlock> l(state_change_lock_);
-
-  if (state() != RUNNING) {
-    LOG(WARNING) << "Not registering maintenance operations for " << tablet_
-                 << ": tablet not in RUNNING state";
-    return;
-  }
-
-  DCHECK(maintenance_ops_.empty());
-
-  gscoped_ptr<MaintenanceOp> mrs_flush_op(new FlushMRSOp(this));
-  maint_mgr->RegisterOp(mrs_flush_op.get());
-  maintenance_ops_.push_back(mrs_flush_op.release());
-
-  gscoped_ptr<MaintenanceOp> dms_flush_op(new FlushDeltaMemStoresOp(this));
-  maint_mgr->RegisterOp(dms_flush_op.get());
-  maintenance_ops_.push_back(dms_flush_op.release());
-
-  gscoped_ptr<MaintenanceOp> log_gc(new LogGCOp(this));
-  maint_mgr->RegisterOp(log_gc.get());
-  maintenance_ops_.push_back(log_gc.release());
-
-  tablet_->RegisterMaintenanceOps(maint_mgr);
-}
-
-void TabletPeer::UnregisterMaintenanceOps() {
-  DCHECK(state_change_lock_.is_locked());
-  for (MaintenanceOp* op : maintenance_ops_) {
-    op->Unregister();
-  }
-  STLDeleteElements(&maintenance_ops_);
-}
-
-Status FlushInflightsToLogCallback::WaitForInflightsAndFlushLog() {
-  // This callback is triggered prior to any TabletMetadata flush.
-  // The guarantee that we are trying to enforce is this:
-  //
-  //   If an operation has been flushed to stable storage (eg a DRS or DeltaFile)
-  //   then its COMMIT message must be present in the log.
-  //
-  // The purpose for this is so that, during bootstrap, we can accurately identify
-  // whether each operation has been flushed. If we don't see a COMMIT message for
-  // an operation, then we assume it was not completely applied and needs to be
-  // re-applied. Thus, if we had something on disk but with no COMMIT message,
-  // we'd attempt to double-apply the write, resulting in an error (eg trying to
-  // delete an already-deleted row).
-  //
-  // So, to enforce this property, we do two steps:
-  //
-  // 1) Wait for any operations which are already mid-Apply() to Commit() in MVCC.
-  //
-  // Because the operations always enqueue their COMMIT message to the log
-  // before calling Commit(), this ensures that any in-flight operations have
-  // their commit messages "en route".
-  //
-  // NOTE: we only wait for those operations that have started their Apply() phase.
-  // Any operations which haven't yet started applying haven't made any changes
-  // to in-memory state: thus, they obviously couldn't have made any changes to
-  // on-disk storage either (data can only get to the disk by going through an in-memory
-  // store). Only those that have started Apply() could have potentially written some
-  // data which is now on disk.
-  //
-  // Perhaps more importantly, if we waited on operations that hadn't started their
-  // Apply() phase, we might be waiting forever -- for example, if a follower has been
-  // partitioned from its leader, it may have operations sitting around in flight
-  // for quite a long time before eventually aborting or committing. This would
-  // end up blocking all flushes if we waited on it.
-  //
-  // 2) Flush the log
-  //
-  // This ensures that the above-mentioned commit messages are not just enqueued
-  // to the log, but also on disk.
-  VLOG(1) << "T " << tablet_->metadata()->tablet_id()
-      <<  ": Waiting for in-flight transactions to commit.";
-  LOG_SLOW_EXECUTION(WARNING, 200, "Committing in-flights took a long time.") {
-    tablet_->mvcc_manager()->WaitForApplyingTransactionsToCommit();
-  }
-  VLOG(1) << "T " << tablet_->metadata()->tablet_id()
-      << ": Waiting for the log queue to be flushed.";
-  LOG_SLOW_EXECUTION(WARNING, 200, "Flushing the Log queue took a long time.") {
-    RETURN_NOT_OK(log_->WaitUntilAllFlushed());
-  }
-  return Status::OK();
-}
-
-
-}  // namespace tablet
-}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.h b/src/kudu/tablet/tablet_peer.h
deleted file mode 100644
index d622626..0000000
--- a/src/kudu/tablet/tablet_peer.h
+++ /dev/null
@@ -1,377 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef KUDU_TABLET_TABLET_PEER_H_
-#define KUDU_TABLET_TABLET_PEER_H_
-
-#include <map>
-#include <memory>
-#include <mutex>
-#include <string>
-#include <vector>
-
-#include "kudu/consensus/consensus.h"
-#include "kudu/consensus/log.h"
-#include "kudu/consensus/time_manager.h"
-#include "kudu/gutil/callback.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/tablet/tablet.h"
-#include "kudu/tablet/transaction_order_verifier.h"
-#include "kudu/tablet/transactions/transaction_tracker.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/semaphore.h"
-
-namespace kudu {
-
-namespace log {
-class LogAnchorRegistry;
-}
-
-namespace rpc {
-class Messenger;
-class ResultTracker;
-}
-
-namespace tserver {
-class CatchUpServiceTest;
-}
-
-class MaintenanceManager;
-class MaintenanceOp;
-
-namespace tablet {
-class LeaderTransactionDriver;
-class ReplicaTransactionDriver;
-class TabletPeer;
-class TabletStatusPB;
-class TabletStatusListener;
-class TransactionDriver;
-
-// Interface by which various tablet-related processes can report back their status
-// to TabletPeer without having to have a circular class dependency, and so that
-// those other classes can be easily tested without constructing a TabletPeer.
-class TabletStatusListener {
- public:
-  virtual ~TabletStatusListener() {}
-
-  virtual void StatusMessage(const std::string& status) = 0;
-};
-
-// A peer in a tablet consensus configuration, which coordinates writes to tablets.
-// Each time Write() is called this class appends a new entry to a replicated
-// state machine through a consensus algorithm, which makes sure that other
-// peers see the same updates in the same order. In addition to this, this
-// class also splits the work and coordinates multi-threaded execution.
-class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
-                   public consensus::ReplicaTransactionFactory,
-                   public TabletStatusListener {
- public:
-  TabletPeer(const scoped_refptr<TabletMetadata>& meta,
-             const consensus::RaftPeerPB& local_peer_pb, ThreadPool* apply_pool,
-             Callback<void(const std::string& reason)> mark_dirty_clbk);
-
-  // Initializes the TabletPeer, namely creating the Log and initializing
-  // Consensus.
-  Status Init(const std::shared_ptr<tablet::Tablet>& tablet,
-              const scoped_refptr<server::Clock>& clock,
-              const std::shared_ptr<rpc::Messenger>& messenger,
-              const scoped_refptr<rpc::ResultTracker>& result_tracker,
-              const scoped_refptr<log::Log>& log,
-              const scoped_refptr<MetricEntity>& metric_entity);
-
-  // Starts the TabletPeer, making it available for Write()s. If this
-  // TabletPeer is part of a consensus configuration this will connect it to other peers
-  // in the consensus configuration.
-  Status Start(const consensus::ConsensusBootstrapInfo& info);
-
-  // Shutdown this tablet peer.
-  // If a shutdown is already in progress, blocks until that shutdown is complete.
-  void Shutdown();
-
-  // Check that the tablet is in a RUNNING state.
-  Status CheckRunning() const;
-
-  // Wait until the tablet is in a RUNNING state or if there's a timeout.
-  // TODO have a way to wait for any state?
-  Status WaitUntilConsensusRunning(const MonoDelta& timeout);
-
-  // Submits a write to a tablet and executes it asynchronously.
-  // The caller is expected to build and pass a TrasactionContext that points
-  // to the RPC WriteRequest, WriteResponse, RpcContext and to the tablet's
-  // MvccManager.
-  Status SubmitWrite(std::unique_ptr<WriteTransactionState> tx_state);
-
-  // Called by the tablet service to start an alter schema transaction.
-  //
-  // The transaction contains all the information required to execute the
-  // AlterSchema operation and send the response back.
-  //
-  // If the returned Status is OK, the response to the client will be sent
-  // asynchronously. Otherwise the tablet service will have to send the response directly.
-  //
-  // The AlterSchema operation is taking the tablet component lock in exclusive mode
-  // meaning that no other operation on the tablet can be executed while the
-  // AlterSchema is in progress.
-  Status SubmitAlterSchema(std::unique_ptr<AlterSchemaTransactionState> tx_state);
-
-  void GetTabletStatusPB(TabletStatusPB* status_pb_out) const;
-
-  // Used by consensus to create and start a new ReplicaTransaction.
-  virtual Status StartReplicaTransaction(
-      const scoped_refptr<consensus::ConsensusRound>& round) OVERRIDE;
-
-  consensus::Consensus* consensus() {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    return consensus_.get();
-  }
-
-  scoped_refptr<consensus::Consensus> shared_consensus() const {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    return consensus_;
-  }
-
-  Tablet* tablet() const {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    return tablet_.get();
-  }
-
-  scoped_refptr<consensus::TimeManager> time_manager() const {
-    return consensus_->time_manager();
-  }
-
-  std::shared_ptr<Tablet> shared_tablet() const {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    return tablet_;
-  }
-
-  const TabletStatePB state() const {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    return state_;
-  }
-
-  // Returns the current Raft configuration.
-  const consensus::RaftConfigPB RaftConfig() const;
-
-  // If any peers in the consensus configuration lack permanent uuids, get them via an
-  // RPC call and update.
-  // TODO: move this to raft_consensus.h.
-  Status UpdatePermanentUuids();
-
-  // Sets the tablet to a BOOTSTRAPPING state, indicating it is starting up.
-  void SetBootstrapping() {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    CHECK_EQ(NOT_STARTED, state_);
-    state_ = BOOTSTRAPPING;
-  }
-
-  // Implementation of TabletStatusListener::StatusMessage().
-  void StatusMessage(const std::string& status) override;
-
-  // Retrieve the last human-readable status of this tablet peer.
-  std::string last_status() const;
-
-  // Sets the tablet state to FAILED additionally setting the error to the provided
-  // one.
-  void SetFailed(const Status& error);
-
-  // Returns the error that occurred, when state is FAILED.
-  Status error() const {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    return error_;
-  }
-
-  // Returns a human-readable string indicating the state of the tablet.
-  // Typically this looks like "NOT_STARTED", "TABLET_DATA_COPYING",
-  // etc. For use in places like the Web UI.
-  std::string HumanReadableState() const;
-
-  // Adds list of transactions in-flight at the time of the call to 'out'.
-  void GetInFlightTransactions(Transaction::TraceType trace_type,
-                               std::vector<consensus::TransactionStatusPB>* out) const;
-
-  // Returns the log indexes to be retained for durability and to catch up peers.
-  // Used for selection of log segments to delete during Log GC.
-  log::RetentionIndexes GetRetentionIndexes() const;
-
-  // See Log::GetReplaySizeMap(...).
-  //
-  // Returns a non-ok status if the tablet isn't running.
-  Status GetReplaySizeMap(std::map<int64_t, int64_t>* replay_size_map) const;
-
-  // Returns the amount of bytes that would be GC'd if RunLogGC() was called.
-  //
-  // Returns a non-ok status if the tablet isn't running.
-  Status GetGCableDataSize(int64_t* retention_size) const;
-
-  // Return a pointer to the Log.
-  // TabletPeer keeps a reference to Log after Init().
-  log::Log* log() const {
-    return log_.get();
-  }
-
-  server::Clock* clock() {
-    return clock_.get();
-  }
-
-  const scoped_refptr<log::LogAnchorRegistry>& log_anchor_registry() const {
-    return log_anchor_registry_;
-  }
-
-  // Returns the tablet_id of the tablet managed by this TabletPeer.
-  // Returns the correct tablet_id even if the underlying tablet is not available
-  // yet.
-  const std::string& tablet_id() const { return tablet_id_; }
-
-  // Convenience method to return the permanent_uuid of this peer.
-  std::string permanent_uuid() const { return tablet_->metadata()->fs_manager()->uuid(); }
-
-  Status NewLeaderTransactionDriver(gscoped_ptr<Transaction> transaction,
-                                    scoped_refptr<TransactionDriver>* driver);
-
-  Status NewReplicaTransactionDriver(gscoped_ptr<Transaction> transaction,
-                                     scoped_refptr<TransactionDriver>* driver);
-
-  // Tells the tablet's log to garbage collect.
-  Status RunLogGC();
-
-  // Register the maintenance ops associated with this peer's tablet, also invokes
-  // Tablet::RegisterMaintenanceOps().
-  void RegisterMaintenanceOps(MaintenanceManager* maintenance_manager);
-
-  // Unregister the maintenance ops associated with this peer's tablet.
-  // This method is not thread safe.
-  void UnregisterMaintenanceOps();
-
-  // Return pointer to the transaction tracker for this peer.
-  const TransactionTracker* transaction_tracker() const { return &txn_tracker_; }
-
-  const scoped_refptr<TabletMetadata>& tablet_metadata() const {
-    return meta_;
-  }
-
-  // Marks the tablet as dirty so that it's included in the next heartbeat.
-  void MarkTabletDirty(const std::string& reason) {
-    mark_dirty_clbk_.Run(reason);
-  }
-
- private:
-  friend class RefCountedThreadSafe<TabletPeer>;
-  friend class TabletPeerTest;
-  FRIEND_TEST(TabletPeerTest, TestMRSAnchorPreventsLogGC);
-  FRIEND_TEST(TabletPeerTest, TestDMSAnchorPreventsLogGC);
-  FRIEND_TEST(TabletPeerTest, TestActiveTransactionPreventsLogGC);
-
-  ~TabletPeer();
-
-  // Wait until the TabletPeer is fully in SHUTDOWN state.
-  void WaitUntilShutdown();
-
-  // After bootstrap is complete and consensus is setup this initiates the transactions
-  // that were not complete on bootstrap.
-  // Not implemented yet. See .cc file.
-  Status StartPendingTransactions(consensus::RaftPeerPB::Role my_role,
-                                  const consensus::ConsensusBootstrapInfo& bootstrap_info);
-
-  const scoped_refptr<TabletMetadata> meta_;
-
-  const std::string tablet_id_;
-
-  const consensus::RaftPeerPB local_peer_pb_;
-
-  TabletStatePB state_;
-  Status error_;
-  TransactionTracker txn_tracker_;
-  TransactionOrderVerifier txn_order_verifier_;
-  scoped_refptr<log::Log> log_;
-  std::shared_ptr<Tablet> tablet_;
-  std::shared_ptr<rpc::Messenger> messenger_;
-  scoped_refptr<consensus::Consensus> consensus_;
-  simple_spinlock prepare_replicate_lock_;
-
-  // Lock protecting state_, last_status_, as well as smart pointers to collaborating
-  // classes such as tablet_ and consensus_.
-  mutable simple_spinlock lock_;
-
-  // The human-readable last status of the tablet, displayed on the web page, command line
-  // tools, etc.
-  std::string last_status_;
-
-  // Lock taken during Init/Shutdown which ensures that only a single thread
-  // attempts to perform major lifecycle operations (Init/Shutdown) at once.
-  // This must be acquired before acquiring lock_ if they are acquired together.
-  // We don't just use lock_ since the lifecycle operations may take a while
-  // and we'd like other threads to be able to quickly poll the state_ variable
-  // during them in order to reject RPCs, etc.
-  mutable simple_spinlock state_change_lock_;
-
-  // IMPORTANT: correct execution of PrepareTask assumes that 'prepare_pool_'
-  // is single-threaded, moving to a multi-tablet setup where multiple TabletPeers
-  // use the same 'prepare_pool_' needs to enforce that, for a single
-  // TabletPeer, PrepareTasks are executed *serially*.
-  // TODO move the prepare pool to TabletServer.
-  gscoped_ptr<ThreadPool> prepare_pool_;
-
-  // Pool that executes apply tasks for transactions. This is a multi-threaded
-  // pool, constructor-injected by either the Master (for system tables) or
-  // the Tablet server.
-  ThreadPool* apply_pool_;
-
-  scoped_refptr<server::Clock> clock_;
-
-  scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
-
-  // Function to mark this TabletPeer's tablet as dirty in the TSTabletManager.
-  //
-  // Must be called whenever cluster membership or leadership changes, or when
-  // the tablet's schema changes.
-  Callback<void(const std::string& reason)> mark_dirty_clbk_;
-
-  // List of maintenance operations for the tablet that need information that only the peer
-  // can provide.
-  std::vector<MaintenanceOp*> maintenance_ops_;
-
-  // The result tracker for writes.
-  scoped_refptr<rpc::ResultTracker> result_tracker_;
-
-  DISALLOW_COPY_AND_ASSIGN(TabletPeer);
-};
-
-// A callback to wait for the in-flight transactions to complete and to flush
-// the Log when they do.
-// Tablet is passed as a raw pointer as this callback is set in TabletMetadata and
-// were we to keep the tablet as a shared_ptr a circular dependency would occur:
-// callback->tablet->metadata->callback. Since the tablet indirectly owns this
-// callback we know that is must still be alive when it fires.
-class FlushInflightsToLogCallback : public RefCountedThreadSafe<FlushInflightsToLogCallback> {
- public:
-  FlushInflightsToLogCallback(Tablet* tablet,
-                              const scoped_refptr<log::Log>& log)
-   : tablet_(tablet),
-     log_(log) {}
-
-  Status WaitForInflightsAndFlushLog();
-
- private:
-  Tablet* tablet_;
-  scoped_refptr<log::Log> log_;
-};
-
-
-}  // namespace tablet
-}  // namespace kudu
-
-#endif /* KUDU_TABLET_TABLET_PEER_H_ */

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer_mm_ops.cc b/src/kudu/tablet/tablet_peer_mm_ops.cc
deleted file mode 100644
index 508d69f..0000000
--- a/src/kudu/tablet/tablet_peer_mm_ops.cc
+++ /dev/null
@@ -1,244 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/tablet/tablet_peer_mm_ops.h"
-
-#include <algorithm>
-#include <gflags/gflags.h>
-#include <map>
-#include <mutex>
-#include <string>
-
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/tablet_metrics.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/maintenance_manager.h"
-#include "kudu/util/metrics.h"
-
-DEFINE_int32(flush_threshold_mb, 1024,
-             "Size at which MemRowSet flushes are triggered. "
-             "A MRS can still flush below this threshold if it if hasn't flushed in a while, "
-             "or if the server-wide memory limit has been reached.");
-TAG_FLAG(flush_threshold_mb, experimental);
-
-DEFINE_int32(flush_threshold_secs, 2 * 60,
-             "Number of seconds after which a non-empty MemRowSet will become flushable "
-             "even if it is not large.");
-TAG_FLAG(flush_threshold_secs, experimental);
-
-
-METRIC_DEFINE_gauge_uint32(tablet, log_gc_running,
-                           "Log GCs Running",
-                           kudu::MetricUnit::kOperations,
-                           "Number of log GC operations currently running.");
-METRIC_DEFINE_histogram(tablet, log_gc_duration,
-                        "Log GC Duration",
-                        kudu::MetricUnit::kMilliseconds,
-                        "Time spent garbage collecting the logs.", 60000LU, 1);
-
-namespace kudu {
-namespace tablet {
-
-using std::map;
-using strings::Substitute;
-
-// Upper bound for how long it takes to reach "full perf improvement" in time-based flushing.
-const double kFlushUpperBoundMs = 60 * 60 * 1000;
-
-//
-// FlushOpPerfImprovementPolicy.
-//
-
-void FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats* stats,
-                                                              double elapsed_ms) {
-  if (stats->ram_anchored() > FLAGS_flush_threshold_mb * 1024 * 1024) {
-    // If we're over the user-specified flush threshold, then consider the perf
-    // improvement to be 1 for every extra MB.  This produces perf_improvement results
-    // which are much higher than any compaction would produce, and means that, when
-    // there is an MRS over threshold, a flush will almost always be selected instead of
-    // a compaction.  That's not necessarily a good thing, but in the absence of better
-    // heuristics, it will do for now.
-    double extra_mb =
-        static_cast<double>(FLAGS_flush_threshold_mb - (stats->ram_anchored()) / (1024 * 1024));
-    stats->set_perf_improvement(extra_mb);
-  } else if (elapsed_ms > FLAGS_flush_threshold_secs * 1000) {
-    // Even if we aren't over the threshold, consider flushing if we haven't flushed
-    // in a long time. But, don't give it a large perf_improvement score. We should
-    // only do this if we really don't have much else to do, and if we've already waited a bit.
-    // The following will give an improvement that's between 0.0 and 1.0, gradually growing
-    // as 'elapsed_ms' approaches 'kFlushUpperBoundMs'.
-    double perf = elapsed_ms / kFlushUpperBoundMs;
-    if (perf > 1.0) {
-      perf = 1.0;
-    }
-    stats->set_perf_improvement(perf);
-  }
-}
-
-//
-// FlushMRSOp.
-//
-
-void FlushMRSOp::UpdateStats(MaintenanceOpStats* stats) {
-  std::lock_guard<simple_spinlock> l(lock_);
-
-  map<int64_t, int64_t> replay_size_map;
-  if (tablet_peer_->tablet()->MemRowSetEmpty() ||
-      !tablet_peer_->GetReplaySizeMap(&replay_size_map).ok()) {
-    return;
-  }
-
-  {
-    std::unique_lock<Semaphore> lock(tablet_peer_->tablet()->rowsets_flush_sem_, std::defer_lock);
-    stats->set_runnable(lock.try_lock());
-  }
-
-  stats->set_ram_anchored(tablet_peer_->tablet()->MemRowSetSize());
-  stats->set_logs_retained_bytes(
-      tablet_peer_->tablet()->MemRowSetLogReplaySize(replay_size_map));
-
-  // TODO(todd): use workload statistics here to find out how "hot" the tablet has
-  // been in the last 5 minutes.
-  FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
-      stats,
-      time_since_flush_.elapsed().wall_millis());
-}
-
-bool FlushMRSOp::Prepare() {
-  // Try to acquire the rowsets_flush_sem_.  If we can't, the Prepare step
-  // fails.  This also implies that only one instance of FlushMRSOp can be
-  // running at once.
-  return tablet_peer_->tablet()->rowsets_flush_sem_.try_lock();
-}
-
-void FlushMRSOp::Perform() {
-  CHECK(!tablet_peer_->tablet()->rowsets_flush_sem_.try_lock());
-
-  KUDU_CHECK_OK_PREPEND(tablet_peer_->tablet()->FlushUnlocked(),
-                        Substitute("FlushMRS failed on $0", tablet_peer_->tablet_id()));
-
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    time_since_flush_.start();
-  }
-  tablet_peer_->tablet()->rowsets_flush_sem_.unlock();
-}
-
-scoped_refptr<Histogram> FlushMRSOp::DurationHistogram() const {
-  return tablet_peer_->tablet()->metrics()->flush_mrs_duration;
-}
-
-scoped_refptr<AtomicGauge<uint32_t> > FlushMRSOp::RunningGauge() const {
-  return tablet_peer_->tablet()->metrics()->flush_mrs_running;
-}
-
-//
-// FlushDeltaMemStoresOp.
-//
-
-void FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) {
-  std::lock_guard<simple_spinlock> l(lock_);
-  int64_t dms_size;
-  int64_t retention_size;
-  map<int64_t, int64_t> max_idx_to_replay_size;
-  if (tablet_peer_->tablet()->DeltaMemRowSetEmpty() ||
-      !tablet_peer_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) {
-    return;
-  }
-  tablet_peer_->tablet()->GetInfoForBestDMSToFlush(max_idx_to_replay_size,
-                                                   &dms_size, &retention_size);
-
-  stats->set_ram_anchored(dms_size);
-  stats->set_runnable(true);
-  stats->set_logs_retained_bytes(retention_size);
-
-  FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
-      stats,
-      time_since_flush_.elapsed().wall_millis());
-}
-
-void FlushDeltaMemStoresOp::Perform() {
-  map<int64_t, int64_t> max_idx_to_replay_size;
-  if (!tablet_peer_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) {
-    LOG(WARNING) << "Won't flush deltas since tablet shutting down: " << tablet_peer_->tablet_id();
-    return;
-  }
-  KUDU_CHECK_OK_PREPEND(tablet_peer_->tablet()->FlushDMSWithHighestRetention(
-                            max_idx_to_replay_size),
-                        Substitute("Failed to flush DMS on $0",
-                                   tablet_peer_->tablet()->tablet_id()));
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    time_since_flush_.start();
-  }
-}
-
-scoped_refptr<Histogram> FlushDeltaMemStoresOp::DurationHistogram() const {
-  return tablet_peer_->tablet()->metrics()->flush_dms_duration;
-}
-
-scoped_refptr<AtomicGauge<uint32_t> > FlushDeltaMemStoresOp::RunningGauge() const {
-  return tablet_peer_->tablet()->metrics()->flush_dms_running;
-}
-
-//
-// LogGCOp.
-//
-
-LogGCOp::LogGCOp(TabletPeer* tablet_peer)
-    : MaintenanceOp(StringPrintf("LogGCOp(%s)", tablet_peer->tablet()->tablet_id().c_str()),
-                    MaintenanceOp::LOW_IO_USAGE),
-      tablet_peer_(tablet_peer),
-      log_gc_duration_(METRIC_log_gc_duration.Instantiate(
-                           tablet_peer->tablet()->GetMetricEntity())),
-      log_gc_running_(METRIC_log_gc_running.Instantiate(
-                          tablet_peer->tablet()->GetMetricEntity(), 0)),
-      sem_(1) {}
-
-void LogGCOp::UpdateStats(MaintenanceOpStats* stats) {
-  int64_t retention_size;
-
-  if (!tablet_peer_->GetGCableDataSize(&retention_size).ok()) {
-    return;
-  }
-
-  stats->set_logs_retained_bytes(retention_size);
-  stats->set_runnable(sem_.GetValue() == 1);
-}
-
-bool LogGCOp::Prepare() {
-  return sem_.try_lock();
-}
-
-void LogGCOp::Perform() {
-  CHECK(!sem_.try_lock());
-
-  tablet_peer_->RunLogGC();
-
-  sem_.unlock();
-}
-
-scoped_refptr<Histogram> LogGCOp::DurationHistogram() const {
-  return log_gc_duration_;
-}
-
-scoped_refptr<AtomicGauge<uint32_t> > LogGCOp::RunningGauge() const {
-  return log_gc_running_;
-}
-
-}  // namespace tablet
-}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer_mm_ops.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer_mm_ops.h b/src/kudu/tablet/tablet_peer_mm_ops.h
deleted file mode 100644
index a985802..0000000
--- a/src/kudu/tablet/tablet_peer_mm_ops.h
+++ /dev/null
@@ -1,133 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef KUDU_TABLET_TABLET_PEER_MM_OPS_H_
-#define KUDU_TABLET_TABLET_PEER_MM_OPS_H_
-
-#include "kudu/tablet/tablet_peer.h"
-#include "kudu/util/maintenance_manager.h"
-#include "kudu/util/stopwatch.h"
-
-namespace kudu {
-
-class Histogram;
-template<class T>
-class AtomicGauge;
-
-namespace tablet {
-
-class FlushOpPerfImprovementPolicy {
- public:
-  ~FlushOpPerfImprovementPolicy() {}
-
-  // Sets the performance improvement based on the anchored ram if it's over the threshold,
-  // else it will set it based on how long it has been since the last flush.
-  static void SetPerfImprovementForFlush(MaintenanceOpStats* stats, double elapsed_ms);
-
- private:
-  FlushOpPerfImprovementPolicy() {}
-};
-
-// Maintenance op for MRS flush. Only one can happen at a time.
-class FlushMRSOp : public MaintenanceOp {
- public:
-  explicit FlushMRSOp(TabletPeer* tablet_peer)
-    : MaintenanceOp(StringPrintf("FlushMRSOp(%s)", tablet_peer->tablet()->tablet_id().c_str()),
-                    MaintenanceOp::HIGH_IO_USAGE),
-      tablet_peer_(tablet_peer) {
-    time_since_flush_.start();
-  }
-
-  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
-
-  virtual bool Prepare() OVERRIDE;
-
-  virtual void Perform() OVERRIDE;
-
-  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
-
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
-
- private:
-  // Lock protecting time_since_flush_.
-  mutable simple_spinlock lock_;
-  Stopwatch time_since_flush_;
-
-  TabletPeer *const tablet_peer_;
-};
-
-// Maintenance op for DMS flush.
-// Reports stats for all the DMS this tablet contains but only flushes one in Perform().
-class FlushDeltaMemStoresOp : public MaintenanceOp {
- public:
-  explicit FlushDeltaMemStoresOp(TabletPeer* tablet_peer)
-    : MaintenanceOp(StringPrintf("FlushDeltaMemStoresOp(%s)",
-                                 tablet_peer->tablet()->tablet_id().c_str()),
-                    MaintenanceOp::HIGH_IO_USAGE),
-      tablet_peer_(tablet_peer) {
-    time_since_flush_.start();
-  }
-
-  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
-
-  virtual bool Prepare() OVERRIDE {
-    return true;
-  }
-
-  virtual void Perform() OVERRIDE;
-
-  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
-
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
-
- private:
-  // Lock protecting time_since_flush_
-  mutable simple_spinlock lock_;
-  Stopwatch time_since_flush_;
-
-  TabletPeer *const tablet_peer_;
-};
-
-// Maintenance task that runs log GC. Reports log retention that represents the amount of data
-// that can be GC'd.
-//
-// Only one LogGC op can run at a time.
-class LogGCOp : public MaintenanceOp {
- public:
-  explicit LogGCOp(TabletPeer* tablet_peer);
-
-  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
-
-  virtual bool Prepare() OVERRIDE;
-
-  virtual void Perform() OVERRIDE;
-
-  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
-
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
-
- private:
-  TabletPeer *const tablet_peer_;
-  scoped_refptr<Histogram> log_gc_duration_;
-  scoped_refptr<AtomicGauge<uint32_t> > log_gc_running_;
-  mutable Semaphore sem_;
-};
-
-} // namespace tablet
-} // namespace kudu
-
-#endif /* KUDU_TABLET_TABLET_PEER_MM_OPS_H_ */


Mime
View raw message