kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] 02/02: log: separate out allocation logic
Date Sat, 09 Nov 2019 07:44:42 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 188fb4260d1609e4adfb7dca5867a40d94550f93
Author: Andrew Wong <awong@apache.org>
AuthorDate: Wed Oct 16 15:37:06 2019 -0700

    log: separate out allocation logic
    
    The Log class has a few responsibilities:
    1. adding batches to an in-memory queue asynchronously
    2. picking off batches from the queue and writing them to disk
    3. pre-allocating files in the background when a log segment grows too
       large
    4. "rolling" onto a newly pre-allocated file once it is fully allocated
    5. garbage collecting old log segments
    6. a myriad of other bookkeeping related to indexing and size
    
    1 and 2 have been neatly encapsulated in the Log::AppendThread subclass,
    leaving the rest as the monolith that is the Log class.
    
    This patch separates out 3 and 4 into a separate SegmentAllocator class.
    
    This meant a few things:
    - I've pushed the schema into the SegmentAllocator. This makes sense
      because the schema is only really needed by the WAL so it can write
      out the current schema of each segment. As such, only the entity that
      manages segment allocation ought to know about it.
    - Many of the required Log members are put into a new LogContext,
      containing tablet ID, metrics, etc. We could also use this in the
      AppendThread if we wanted to, but I opted to keep this patch focused
      on the SegmentAllocator.
    - The SegmentAllocator needs to call back into the Log in order to
      update the LogReader when it rolls over. Since the LogReader is
      protected by a lock in the Log, rather than providing a back-pointer
      to the Log class or the LogReader class, this passes thread-safe Log
      function calls into the SegmentAllocator.
    
    I tried to keep the SegmentAllocator methods in the same locations as
    their Log predecessors to keep the diff small. I'll put them closer
    together in a follow-up patch.
    
    Change-Id: I08be4dbdd8e98b02278de76273e931c314b08161
    Reviewed-on: http://gerrit.cloudera.org:8080/14479
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/consensus/log-test.cc         |  16 +-
 src/kudu/consensus/log.cc              | 495 +++++++++++++++++----------------
 src/kudu/consensus/log.h               | 339 +++++++++++++---------
 src/kudu/tablet/tablet_bootstrap.cc    |   2 +-
 src/kudu/tserver/tablet_server-test.cc |   2 +-
 5 files changed, 480 insertions(+), 374 deletions(-)

diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index dfb4f62..ae15fd5 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -437,7 +437,7 @@ TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) {
   ASSERT_EQ(segments.size(), 1);
   scoped_refptr<ReadableLogSegment> readable_segment = segments[0];
 
-  int header_size = log_->active_segment_->written_offset();
+  int header_size = log_->segment_allocator_.active_segment_->written_offset();
   ASSERT_GT(header_size, 0);
   readable_segment->UpdateReadableToOffset(header_size);
 
@@ -464,7 +464,7 @@ TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) {
   int written_entries_size = header_size;
   ASSERT_OK(AppendNoOps(&op_id, kNumEntries, &written_entries_size));
   ASSERT_EQ(single_entry_size * kNumEntries + header_size, written_entries_size);
-  ASSERT_EQ(written_entries_size, log_->active_segment_->written_offset());
+  ASSERT_EQ(written_entries_size, log_->segment_allocator_.active_segment_->written_offset());
 
   // Updating the readable segment with the offset of the first entry should
   // make it read a single entry even though there are several in the log.
@@ -485,11 +485,11 @@ TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) {
   // Offset should get updated for an additional entry.
   ASSERT_EQ(single_entry_size * (kNumEntries + 1) + header_size,
             written_entries_size);
-  ASSERT_EQ(written_entries_size, log_->active_segment_->written_offset());
+  ASSERT_EQ(written_entries_size, log_->segment_allocator_.active_segment_->written_offset());
 
   // When we roll it should go back to the header size.
   ASSERT_OK(log_->AllocateSegmentAndRollOver());
-  ASSERT_EQ(header_size, log_->active_segment_->written_offset());
+  ASSERT_EQ(header_size, log_->segment_allocator_.active_segment_->written_offset());
   written_entries_size = header_size;
 
   // Now that we closed the original segment. If we get a segment from the reader
@@ -504,7 +504,7 @@ TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) {
   // Offset should get updated for an additional entry, again.
   ASSERT_OK(AppendNoOp(&op_id, &written_entries_size));
   ASSERT_EQ(single_entry_size  + header_size, written_entries_size);
-  ASSERT_EQ(written_entries_size, log_->active_segment_->written_offset());
+  ASSERT_EQ(written_entries_size, log_->segment_allocator_.active_segment_->written_offset());
 }
 
 // Tests that segments can be GC'd while the log is running.
@@ -1138,7 +1138,8 @@ TEST_F(LogTest, TestAutoStopIdleAppendThread) {
       AppendNoOpsToLogSync(clock_, log_.get(), &opid, 2);
       ASSERT_TRUE(log_->append_thread_active_for_tests());
       debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
-      ASSERT_GT(log_->active_segment_->compress_buf_.capacity(), faststring::kInitialCapacity);
+      ASSERT_GT(log_->segment_allocator_.active_segment_->compress_buf_.capacity(),
+                faststring::kInitialCapacity);
     });
   // After some time, the append thread should shut itself down.
   ASSERT_EVENTUALLY([&]() {
@@ -1148,7 +1149,8 @@ TEST_F(LogTest, TestAutoStopIdleAppendThread) {
   // The log should free its buffer once it is idle.
   {
     debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
-    ASSERT_EQ(faststring::kInitialCapacity, log_->active_segment_->compress_buf_.capacity());
+    ASSERT_EQ(faststring::kInitialCapacity,
+              log_->segment_allocator_.active_segment_->compress_buf_.capacity());
   }
 }
 
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 27ec91e..23aeaea 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -19,6 +19,7 @@
 
 #include <cerrno>
 #include <cstdint>
+#include <functional>
 #include <memory>
 #include <mutex>
 #include <ostream>
@@ -174,15 +175,20 @@ namespace kudu {
 namespace log {
 
 using consensus::CommitMsg;
-using consensus::OpId;
 using consensus::ReplicateRefPtr;
 using env_util::OpenFileForRandom;
+using std::bind;
+using std::function;
 using std::shared_ptr;
 using std::string;
 using std::vector;
 using std::unique_ptr;
 using strings::Substitute;
 
+string LogContext::LogPrefix() const {
+  return Substitute("T $0 P $1: ", tablet_id, fs_manager->uuid());
+}
+
 // Manages the thread which drains groups of batches from the log's queue and
 // appends them to the underlying log instance.
 //
@@ -294,7 +300,7 @@ void Log::AppendThread::Wake() {
 
 void Log::SetActiveSegmentIdle() {
   std::lock_guard<rw_spinlock> l(segment_idle_lock_);
-  active_segment_->GoIdle();
+  segment_allocator_.active_segment_->GoIdle();
 }
 
 bool Log::AppendThread::GoIdle() {
@@ -358,12 +364,12 @@ void Log::AppendThread::ProcessQueue() {
 }
 
 void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
-  if (log_->metrics_) {
-    log_->metrics_->entry_batches_per_group->Increment(entry_batches.size());
+  if (log_->ctx_.metrics) {
+    log_->ctx_.metrics->entry_batches_per_group->Increment(entry_batches.size());
   }
   TRACE_EVENT1("log", "batch", "batch_size", entry_batches.size());
 
-  SCOPED_LATENCY_METRIC(log_->metrics_, group_commit_latency);
+  SCOPED_LATENCY_METRIC(log_->ctx_.metrics, group_commit_latency);
 
   bool is_all_commits = true;
   for (LogEntryBatch* entry_batch : entry_batches) {
@@ -431,10 +437,54 @@ bool Log::append_thread_active_for_tests() const {
 }
 
 
-// This task is submitted to allocation_pool_ in order to
-// asynchronously pre-allocate new log segments.
-void Log::SegmentAllocationTask() {
-  allocation_status_.Set(PreAllocateNewSegment());
+SegmentAllocator::SegmentAllocator(const LogOptions* opts,
+                                   const LogContext* ctx,
+                                   Schema schema,
+                                   uint32_t schema_version,
+                                   function<Status(scoped_refptr<ReadableLogSegment>)>
+                                       reader_replace_last_segment,
+                                   function<Status(scoped_refptr<ReadableLogSegment>)>
+                                       reader_add_segment)
+    : reader_replace_last_segment_(std::move(reader_replace_last_segment)),
+      reader_add_segment_(std::move(reader_add_segment)),
+      opts_(opts),
+      ctx_(ctx),
+      max_segment_size_(opts_->segment_size_mb * 1024 * 1024),
+      schema_(std::move(schema)),
+      schema_version_(schema_version),
+      sync_disabled_(false) {}
+
+void SegmentAllocator::StopAllocationThread() {
+  allocation_pool_->Shutdown();
+}
+
+Status SegmentAllocator::Init(uint64_t sequence_number) {
+  // Init the compression codec.
+  if (!FLAGS_log_compression_codec.empty()) {
+    auto codec_type = GetCompressionCodecType(FLAGS_log_compression_codec);
+    if (codec_type != NO_COMPRESSION) {
+      RETURN_NOT_OK_PREPEND(GetCompressionCodec(codec_type, &codec_),
+                            "could not instantiate compression codec");
+    }
+  }
+  active_segment_sequence_number = sequence_number;
+  RETURN_NOT_OK(ThreadPoolBuilder("log-alloc")
+      .set_max_threads(1)
+      .Build(&allocation_pool_));
+  return AllocateSegmentAndRollOver();
+}
+
+void SegmentAllocator::SetSchemaForNextSegment(Schema schema,
+                                               uint32_t version) {
+  VLOG_WITH_PREFIX(2) << Substitute("Setting schema version $0 for next log segment $1",
+                                    version, schema.ToString());
+  std::lock_guard<rw_spinlock> l(schema_lock_);
+  schema_ = std::move(schema);
+  schema_version_ = version;
+}
+
+void SegmentAllocator::AllocationTask() {
+  allocation_status_.Set(AllocateNewSegment());
 }
 
 const Status Log::kLogShutdownStatus(
@@ -442,92 +492,67 @@ const Status Log::kLogShutdownStatus(
 
 const uint64_t Log::kInitialLogSegmentSequenceNumber = 0L;
 
-Status Log::Open(const LogOptions &options,
-                 FsManager *fs_manager,
-                 const std::string& tablet_id,
-                 const Schema& schema,
+Status Log::Open(LogOptions options,
+                 FsManager* fs_manager,
+                 const string& tablet_id,
+                 Schema schema,
                  uint32_t schema_version,
                  const scoped_refptr<MetricEntity>& metric_entity,
                  scoped_refptr<Log>* log) {
 
   string tablet_wal_path = fs_manager->GetTabletWalDir(tablet_id);
-  RETURN_NOT_OK(env_util::CreateDirIfMissing(
-      fs_manager->env(), tablet_wal_path));
-
-  scoped_refptr<Log> new_log(new Log(options,
-                                     fs_manager,
-                                     tablet_wal_path,
-                                     tablet_id,
-                                     schema,
-                                     schema_version,
-                                     metric_entity));
+  RETURN_NOT_OK(env_util::CreateDirIfMissing(fs_manager->env(), tablet_wal_path));
+
+  LogContext ctx({ tablet_id, std::move(tablet_wal_path) });
+  ctx.metric_entity = metric_entity;
+  ctx.metrics.reset(metric_entity ? new LogMetrics(metric_entity) : nullptr);
+  ctx.fs_manager = fs_manager;
+  scoped_refptr<Log> new_log(new Log(std::move(options), std::move(ctx), std::move(schema),
+                                     schema_version));
   RETURN_NOT_OK(new_log->Init());
   log->swap(new_log);
   return Status::OK();
 }
 
-Log::Log(LogOptions options, FsManager* fs_manager, string log_path,
-         string tablet_id, const Schema& schema, uint32_t schema_version,
-         scoped_refptr<MetricEntity> metric_entity)
-    : options_(options),
-      fs_manager_(fs_manager),
-      log_dir_(std::move(log_path)),
-      tablet_id_(std::move(tablet_id)),
-      schema_(schema),
-      schema_version_(schema_version),
-      active_segment_sequence_number_(0),
+Log::Log(LogOptions options, LogContext ctx, Schema schema, uint32_t schema_version)
+    : options_(std::move(options)),
+      ctx_(std::move(ctx)),
       log_state_(kLogInitialized),
-      max_segment_size_(options_.segment_size_mb * 1024 * 1024),
       entry_batch_queue_(FLAGS_group_commit_queue_size_bytes),
       append_thread_(new AppendThread(this)),
-      force_sync_all_(options_.force_fsync_all),
-      sync_disabled_(false),
-      allocation_state_(kAllocationNotStarted),
-      codec_(nullptr),
-      metric_entity_(std::move(metric_entity)),
+      segment_allocator_(&options_, &ctx_, std::move(schema), schema_version,
+          bind(&Log::ReplaceSegmentInReader, this, std::placeholders::_1),
+          bind(&Log::AddEmptySegmentInReader, this, std::placeholders::_1)),
       on_disk_size_(0) {
-  CHECK_OK(ThreadPoolBuilder("log-alloc").set_max_threads(1).Build(&allocation_pool_));
-  if (metric_entity_) {
-    metrics_.reset(new LogMetrics(metric_entity_));
-  }
 }
 
 Status Log::Init() {
-  std::lock_guard<percpu_rwlock> write_lock(state_lock_);
   CHECK_EQ(kLogInitialized, log_state_);
 
-  // Init the compression codec.
-  if (!FLAGS_log_compression_codec.empty()) {
-    auto codec_type = GetCompressionCodecType(FLAGS_log_compression_codec);
-    if (codec_type != NO_COMPRESSION) {
-      RETURN_NOT_OK_PREPEND(GetCompressionCodec(codec_type, &codec_),
-                            "could not instantiate compression codec");
-    }
-  }
-
   // Init the index
-  log_index_.reset(new LogIndex(log_dir_));
+  log_index_.reset(new LogIndex(ctx_.log_dir));
 
   // Reader for previous segments.
-  RETURN_NOT_OK(LogReader::Open(fs_manager_,
+  RETURN_NOT_OK(LogReader::Open(ctx_.fs_manager,
                                 log_index_,
-                                tablet_id_,
-                                metric_entity_.get(),
+                                ctx_.tablet_id,
+                                ctx_.metric_entity.get(),
                                 &reader_));
 
   // The case where we are continuing an existing log.
   // We must pick up where the previous WAL left off in terms of
   // sequence numbers.
+  uint64_t active_segment_sequence_number = 0;
   if (reader_->num_segments() != 0) {
     VLOG_WITH_PREFIX(1) << "Using existing " << reader_->num_segments()
-                        << " segments from path: " << fs_manager_->GetWalsRootDir();
+                        << " segments from path: " << ctx_.fs_manager->GetWalsRootDir();
 
     vector<scoped_refptr<ReadableLogSegment> > segments;
     RETURN_NOT_OK(reader_->GetSegmentsSnapshot(&segments));
-    active_segment_sequence_number_ = segments.back()->header().sequence_number();
+    active_segment_sequence_number = segments.back()->header().sequence_number();
   }
 
-  if (force_sync_all_) {
+  if (options_.force_fsync_all) {
     KLOG_FIRST_N(INFO, 1) << LogPrefix() << "Log is configured to fsync() on all Append() calls";
   } else {
     KLOG_FIRST_N(INFO, 1) << LogPrefix()
@@ -535,50 +560,56 @@ Status Log::Init() {
   }
 
   // We always create a new segment when the log starts.
-  RETURN_NOT_OK(AsyncAllocateSegment());
-  RETURN_NOT_OK(allocation_status_.Get());
-  RETURN_NOT_OK(SwitchToAllocatedSegment());
-
+  RETURN_NOT_OK(segment_allocator_.Init(active_segment_sequence_number));
   RETURN_NOT_OK(append_thread_->Init());
   log_state_ = kLogWriting;
   return Status::OK();
 }
 
-Status Log::AsyncAllocateSegment() {
-  std::lock_guard<RWMutex> l(allocation_lock_);
-  CHECK_EQ(allocation_state_, kAllocationNotStarted);
+Status SegmentAllocator::AsyncAllocateSegmentUnlocked() {
+  allocation_lock_.AssertAcquiredForWriting();
+  DCHECK_EQ(kAllocationNotStarted, allocation_state_);
   allocation_status_.Reset();
   allocation_state_ = kAllocationInProgress;
-  RETURN_NOT_OK(allocation_pool_->SubmitClosure(
-                  Bind(&Log::SegmentAllocationTask, Unretained(this))));
-  return Status::OK();
+  return allocation_pool_->SubmitClosure(
+      Bind(&SegmentAllocator::AllocationTask, Unretained(this)));
 }
 
-Status Log::CloseCurrentSegment() {
-  if (!footer_builder_.has_min_replicate_index()) {
+Status SegmentAllocator::CloseCurrentSegment() {
+  if (hooks_) {
+    RETURN_NOT_OK_PREPEND(hooks_->PreClose(), "PreClose hook failed");
+  }
+  RETURN_NOT_OK(Sync());
+  if (!footer_.has_min_replicate_index()) {
     VLOG_WITH_PREFIX(1) << "Writing a segment without any REPLICATE message. Segment: "
                         << active_segment_->path();
   }
   VLOG_WITH_PREFIX(2) << "Segment footer for " << active_segment_->path()
-                      << ": " << pb_util::SecureShortDebugString(footer_builder_);
-
-  footer_builder_.set_close_timestamp_micros(GetCurrentTimeMicros());
-  RETURN_NOT_OK(active_segment_->WriteFooterAndClose(footer_builder_));
+                      << ": " << pb_util::SecureShortDebugString(footer_);
 
+  footer_.set_close_timestamp_micros(GetCurrentTimeMicros());
+  RETURN_NOT_OK(active_segment_->WriteFooterAndClose(footer_));
+  if (hooks_) {
+    RETURN_NOT_OK_PREPEND(hooks_->PostClose(), "PostClose hook failed");
+  }
   return Status::OK();
 }
 
-Status Log::RollOver() {
-  SCOPED_LATENCY_METRIC(metrics_, roll_latency);
+Status SegmentAllocator::RollOver() {
+  SCOPED_LATENCY_METRIC(ctx_->metrics, roll_latency);
 
-  // Check if any errors have occurred during allocation
+  // Wait for any on-going allocations to finish.
   RETURN_NOT_OK(allocation_status_.Get());
-
-  DCHECK_EQ(allocation_state(), kAllocationFinished);
-
-  RETURN_NOT_OK(Sync());
-  RETURN_NOT_OK(CloseCurrentSegment());
-
+  DCHECK_EQ(kAllocationFinished, allocation_state());
+
+  // If this isn't the first active segment, close the segment and make it
+  // available to the log reader.
+  if (active_segment_) {
+    RETURN_NOT_OK(CloseCurrentSegment());
+    scoped_refptr<ReadableLogSegment> readable_segment;
+    RETURN_NOT_OK(GetClosedSegment(&readable_segment));
+    RETURN_NOT_OK(reader_replace_last_segment_(readable_segment));
+  }
   RETURN_NOT_OK(SwitchToAllocatedSegment());
 
   VLOG_WITH_PREFIX(1) << "Rolled over to a new log segment at "
@@ -638,6 +669,36 @@ Status Log::AsyncAppendCommit(gscoped_ptr<consensus::CommitMsg> commit_msg,
   return Status::OK();
 }
 
+Status SegmentAllocator::AllocateOrRollOverIfNecessary(uint32_t write_size_bytes) {
+  bool should_rollover = false;
+  // if the size of this entry overflows the current segment, get a new one
+  {
+    std::lock_guard<RWMutex> l(allocation_lock_);
+    if (allocation_state_ == kAllocationNotStarted) {
+      if ((active_segment_->Size() + write_size_bytes + 4) > max_segment_size_) {
+        VLOG_WITH_PREFIX(1) << "Max segment size reached. Starting new segment allocation";
+        RETURN_NOT_OK(AsyncAllocateSegmentUnlocked());
+        if (!opts_->async_preallocate_segments) {
+          should_rollover = true;
+        }
+      }
+    } else if (allocation_state_ == kAllocationFinished) {
+      should_rollover = true;
+    } else {
+      DCHECK(opts_->async_preallocate_segments);
+      VLOG_WITH_PREFIX(1) << "Segment allocation already in progress...";
+    }
+  }
+  if (should_rollover) {
+    TRACE_COUNTER_SCOPE_LATENCY_US("log_roll");
+    LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix())) {
+      RETURN_NOT_OK(RollOver());
+    }
+  }
+  return Status::OK();
+}
+
+
 Status Log::WriteBatch(LogEntryBatch* entry_batch) {
   size_t num_entries = entry_batch->count();
   DCHECK_GT(num_entries, 0) << "Cannot call WriteBatch() with zero entries reserved";
@@ -652,48 +713,30 @@ Status Log::WriteBatch(LogEntryBatch* entry_batch) {
     return Status::OK();
   }
 
-  // if the size of this entry overflows the current segment, get a new one
-  if (allocation_state() == kAllocationNotStarted) {
-    if ((active_segment_->Size() + entry_batch_bytes + 4) > max_segment_size_) {
-      VLOG_WITH_PREFIX(1) << "Max segment size reached. Starting new segment allocation";
-      RETURN_NOT_OK(AsyncAllocateSegment());
-      if (!options_.async_preallocate_segments) {
-        LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix())) {
-          TRACE_COUNTER_SCOPE_LATENCY_US("log_roll");
-          RETURN_NOT_OK(RollOver());
-        }
-      }
-    }
-  } else if (allocation_state() == kAllocationFinished) {
-    LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix())) {
-      RETURN_NOT_OK(RollOver());
-    }
-  } else {
-    VLOG_WITH_PREFIX(1) << "Segment allocation already in progress...";
-  }
-
-  int64_t start_offset = active_segment_->written_offset();
+  RETURN_NOT_OK(segment_allocator_.AllocateOrRollOverIfNecessary(entry_batch_bytes));
+  auto* active_segment = segment_allocator_.active_segment_.get();
+  int64_t start_offset = active_segment->written_offset();
 
   LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Append to log took a long time", LogPrefix())) {
-    SCOPED_LATENCY_METRIC(metrics_, append_latency);
+    SCOPED_LATENCY_METRIC(ctx_.metrics, append_latency);
     SCOPED_WATCH_STACK(500);
 
-    RETURN_NOT_OK(active_segment_->WriteEntryBatch(entry_batch_data, codec_));
+    RETURN_NOT_OK(active_segment->WriteEntryBatch(entry_batch_data, segment_allocator_.codec_));
 
     // Update the reader on how far it can read the active segment.
-    reader_->UpdateLastSegmentOffset(active_segment_->written_offset());
+    reader_->UpdateLastSegmentOffset(active_segment->written_offset());
 
-    if (log_hooks_) {
-      RETURN_NOT_OK_PREPEND(log_hooks_->PostAppend(), "PostAppend hook failed");
+    if (segment_allocator_.hooks_) {
+      RETURN_NOT_OK(segment_allocator_.hooks_->PostAppend());
     }
   }
 
-  if (metrics_) {
-    metrics_->bytes_logged->IncrementBy(entry_batch_bytes);
+  if (ctx_.metrics) {
+    ctx_.metrics->bytes_logged->IncrementBy(entry_batch_bytes);
   }
 
   CHECK_OK(UpdateIndexForBatch(*entry_batch, start_offset));
-  UpdateFooterForBatch(entry_batch);
+  segment_allocator_.UpdateFooterForBatch(*entry_batch);
 
   return Status::OK();
 }
@@ -708,70 +751,78 @@ Status Log::UpdateIndexForBatch(const LogEntryBatch& batch,
     LogIndexEntry index_entry;
 
     index_entry.op_id = entry_pb.replicate().id();
-    index_entry.segment_sequence_number = active_segment_sequence_number_;
+    index_entry.segment_sequence_number = segment_allocator_.active_segment_sequence_number;
     index_entry.offset_in_segment = start_offset;
     RETURN_NOT_OK(log_index_->AddEntry(index_entry));
   }
   return Status::OK();
 }
 
-void Log::UpdateFooterForBatch(LogEntryBatch* batch) {
-  footer_builder_.set_num_entries(footer_builder_.num_entries() + batch->count());
+Status SegmentAllocator::AllocateSegmentAndRollOver() {
+  {
+    std::lock_guard<RWMutex> l(allocation_lock_);
+    RETURN_NOT_OK(AsyncAllocateSegmentUnlocked());
+  }
+  return RollOver();
+}
+
+Status Log::AllocateSegmentAndRollOver() {
+  std::lock_guard<rw_spinlock> l(segment_idle_lock_);
+  return segment_allocator_.AllocateSegmentAndRollOver();
+}
+
+void SegmentAllocator::UpdateFooterForBatch(const LogEntryBatch& batch) {
+  footer_.set_num_entries(footer_.num_entries() + batch.count());
 
   // We keep track of the last-written OpId here.
   // This is needed to initialize Consensus on startup.
   // We also retrieve the opid of the first operation in the batch so that, if
   // we roll over to a new segment, we set the first operation in the footer
   // immediately.
-  if (batch->type_ == REPLICATE) {
+  if (batch.type_ == REPLICATE) {
     // Update the index bounds for the current segment.
-    for (const LogEntryPB& entry_pb : batch->entry_batch_pb_->entry()) {
-      UpdateFooterForReplicateEntry(entry_pb, &footer_builder_);
+    for (const LogEntryPB& entry_pb : batch.entry_batch_pb_->entry()) {
+      UpdateFooterForReplicateEntry(entry_pb, &footer_);
     }
   }
 }
 
-Status Log::AllocateSegmentAndRollOver() {
-  std::lock_guard<rw_spinlock> l(segment_idle_lock_);
-  RETURN_NOT_OK(AsyncAllocateSegment());
-  return RollOver();
-}
-
-FsManager* Log::GetFsManager() {
-  return fs_manager_;
-}
-
-Status Log::Sync() {
+Status SegmentAllocator::Sync() {
   TRACE_EVENT0("log", "Sync");
-  SCOPED_LATENCY_METRIC(metrics_, sync_latency);
+  SCOPED_LATENCY_METRIC(ctx_->metrics, sync_latency);
 
   if (PREDICT_FALSE(FLAGS_log_inject_latency && !sync_disabled_)) {
     Random r(GetCurrentTimeMicros());
     int sleep_ms = r.Normal(FLAGS_log_inject_latency_ms_mean,
                             FLAGS_log_inject_latency_ms_stddev);
     if (sleep_ms > 0) {
-      LOG_WITH_PREFIX(WARNING) << "Injecting " << sleep_ms << "ms of latency in Log::Sync()";
+      LOG_WITH_PREFIX(WARNING) << "Injecting " << sleep_ms
+                               << "ms of latency in SegmentAllocator::Sync()";
       SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
     }
   }
 
-  if (force_sync_all_ && !sync_disabled_) {
+  if (opts_->force_fsync_all && !false) {
     LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Fsync log took a long time", LogPrefix())) {
       RETURN_NOT_OK(active_segment_->Sync());
 
-      if (log_hooks_) {
-        RETURN_NOT_OK_PREPEND(log_hooks_->PostSyncIfFsyncEnabled(),
+      if (hooks_) {
+        RETURN_NOT_OK_PREPEND(hooks_->PostSyncIfFsyncEnabled(),
                               "PostSyncIfFsyncEnabled hook failed");
       }
     }
   }
 
-  if (log_hooks_) {
-    RETURN_NOT_OK_PREPEND(log_hooks_->PostSync(), "PostSync hook failed");
+  if (hooks_) {
+    RETURN_NOT_OK_PREPEND(hooks_->PostSync(), "PostSync hook failed");
   }
   return Status::OK();
 }
 
+Status Log::Sync() {
+  return segment_allocator_.Sync();
+}
+
 int GetPrefixSizeToGC(RetentionIndexes retention_indexes, const SegmentSequence& segments) {
   int rem_segs = segments.size();
   int prefix_size = 0;
@@ -836,7 +887,7 @@ Status Log::WaitUntilAllFlushed() {
 Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
   CHECK_GE(retention_indexes.for_durability, 0);
 
-  VLOG_WITH_PREFIX(1) << "Running Log GC on " << log_dir_ << ": retaining "
+  VLOG_WITH_PREFIX(1) << "Running Log GC on " << ctx_.log_dir << ": retaining "
       "ops >= " << retention_indexes.for_durability << " for durability, "
       "ops >= " << retention_indexes.for_peers << " for peers";
   VLOG_TIMING(1, Substitute("$0Log GC", LogPrefix())) {
@@ -870,7 +921,7 @@ Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
                              segment->footer().max_replicate_index());
       }
       LOG_WITH_PREFIX(INFO) << "Deleting log segment in path: " << segment->path() << ops_str;
-      RETURN_NOT_OK(fs_manager_->env()->DeleteFile(segment->path()));
+      RETURN_NOT_OK(ctx_.fs_manager->env()->DeleteFile(segment->path()));
       (*num_gced)++;
     }
 
@@ -940,42 +991,30 @@ int64_t Log::OnDiskSize() {
   return ret;
 }
 
-void Log::SetSchemaForNextLogSegment(const Schema& schema,
+void Log::SetSchemaForNextLogSegment(Schema schema,
                                      uint32_t version) {
-  VLOG_WITH_PREFIX(2) << Substitute("Setting schema version $0 for next log segment $1",
-                                    version, schema.ToString());
-  std::lock_guard<rw_spinlock> l(schema_lock_);
-  schema_ = schema;
-  schema_version_ = version;
+  segment_allocator_.SetSchemaForNextSegment(std::move(schema), version);
 }
 
 Status Log::Close() {
-  allocation_pool_->Shutdown();
+  segment_allocator_.StopAllocationThread();
   append_thread_->Shutdown();
 
   std::lock_guard<percpu_rwlock> l(state_lock_);
   switch (log_state_) {
-    case kLogWriting:
-      if (log_hooks_) {
-        RETURN_NOT_OK_PREPEND(log_hooks_->PreClose(),
-                              "PreClose hook failed");
-      }
-      RETURN_NOT_OK(Sync());
-      RETURN_NOT_OK(CloseCurrentSegment());
-      RETURN_NOT_OK(ReplaceSegmentInReaderUnlocked());
+    case kLogWriting: {
+      RETURN_NOT_OK(segment_allocator_.CloseCurrentSegment());
+      scoped_refptr<ReadableLogSegment> last_segment;
+      RETURN_NOT_OK(segment_allocator_.GetClosedSegment(&last_segment));
+      RETURN_NOT_OK(reader_->ReplaceLastSegment(last_segment));
       log_state_ = kLogClosed;
       VLOG_WITH_PREFIX(1) << "Log closed";
 
       // Release FDs held by these objects.
       log_index_.reset();
       reader_.reset();
-
-      if (log_hooks_) {
-        RETURN_NOT_OK_PREPEND(log_hooks_->PostClose(),
-                              "PostClose hook failed");
-      }
       return Status::OK();
-
+    }
     case kLogClosed:
       VLOG_WITH_PREFIX(1) << "Log already closed";
       return Status::OK();
@@ -986,7 +1025,7 @@ Status Log::Close() {
 }
 
 bool Log::HasOnDiskData(FsManager* fs_manager, const string& tablet_id) {
-  string wal_dir = fs_manager->GetTabletWalDir(tablet_id);
+  const string wal_dir = fs_manager->GetTabletWalDir(tablet_id);
   return fs_manager->env()->FileExists(wal_dir);
 }
 
@@ -1033,9 +1072,9 @@ Status Log::RemoveRecoveryDirIfExists(FsManager* fs_manager, const string& table
   return Status::OK();
 }
 
-Status Log::PreAllocateNewSegment() {
-  TRACE_EVENT1("log", "PreAllocateNewSegment", "file", next_segment_path_);
-  CHECK_EQ(allocation_state(), kAllocationInProgress);
+Status SegmentAllocator::AllocateNewSegment() {
+  TRACE_EVENT1("log", "AllocateNewSegment", "file", next_segment_path_);
+  CHECK_EQ(kAllocationInProgress, allocation_state());
 
   // We must mark allocation as finished when returning from this method.
   auto alloc_finished = MakeScopedCleanup([&] () {
@@ -1044,56 +1083,61 @@ Status Log::PreAllocateNewSegment() {
   });
 
   WritableFileOptions opts;
-  opts.sync_on_close = force_sync_all_;
-  RETURN_NOT_OK(CreatePlaceholderSegment(opts, &next_segment_path_, &next_segment_file_));
+  opts.sync_on_close = opts_->force_fsync_all;
+  string tmp_suffix = Substitute("$0$1", kTmpInfix, ".newsegmentXXXXXX");
+  string path_tmpl = JoinPathSegments(ctx_->log_dir, tmp_suffix);
+  VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
+  unique_ptr<WritableFile> segment_file;
+  Env* env = ctx_->fs_manager->env();
+  RETURN_NOT_OK_PREPEND(env->NewTempWritableFile(opts,
+                                         path_tmpl,
+                                         &next_segment_path_,
+                                         &segment_file), "ASDF");
+  next_segment_file_.reset(segment_file.release());
+  VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << next_segment_path_;
 
   MAYBE_RETURN_FAILURE(FLAGS_log_inject_io_error_on_preallocate_fraction,
-                       Status::IOError("Injected IOError in Log::PreAllocateNewSegment()"));
+      Status::IOError("Injected IOError in SegmentAllocator::AllocateNewSegment()"));
 
-  if (options_.preallocate_segments) {
-    RETURN_NOT_OK(env_util::VerifySufficientDiskSpace(fs_manager_->env(),
+  if (opts_->preallocate_segments) {
+    RETURN_NOT_OK(env_util::VerifySufficientDiskSpace(env,
                                                       next_segment_path_,
                                                       max_segment_size_,
                                                       FLAGS_fs_wal_dir_reserved_bytes));
     // TODO (perf) zero the new segments -- this could result in
     // additional performance improvements.
-    RETURN_NOT_OK(next_segment_file_->PreAllocate(max_segment_size_));
+    RETURN_NOT_OK_PREPEND(next_segment_file_->PreAllocate(max_segment_size_), "E");
   }
-
   return Status::OK();
 }
 
-Status Log::SwitchToAllocatedSegment() {
-  CHECK_EQ(allocation_state(), kAllocationFinished);
-
+Status SegmentAllocator::SwitchToAllocatedSegment() {
   // Increment "next" log segment seqno.
-  active_segment_sequence_number_++;
-
-  string new_segment_path = fs_manager_->GetWalSegmentFileName(tablet_id_,
-                                                               active_segment_sequence_number_);
-
-  RETURN_NOT_OK(fs_manager_->env()->RenameFile(next_segment_path_, new_segment_path));
-  if (force_sync_all_) {
-    RETURN_NOT_OK(fs_manager_->env()->SyncDir(log_dir_));
+  active_segment_sequence_number++;
+  const auto& tablet_id = ctx_->tablet_id;
+  string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName(tablet_id,
+                                                                    active_segment_sequence_number);
+  Env* env = ctx_->fs_manager->env();
+  RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path), "rename");
+  if (opts_->force_fsync_all) {
+    RETURN_NOT_OK(env->SyncDir(ctx_->log_dir));
   }
 
-  // Create a new segment.
-  gscoped_ptr<WritableLogSegment> new_segment(
-      new WritableLogSegment(new_segment_path, next_segment_file_));
+  // Create a new segment in memory.
+  unique_ptr<WritableLogSegment> new_segment(
+      new WritableLogSegment(new_segment_path, std::move(next_segment_file_)));
 
   // Set up the new header and footer.
   LogSegmentHeaderPB header;
-  header.set_sequence_number(active_segment_sequence_number_);
-  header.set_tablet_id(tablet_id_);
-
+  header.set_sequence_number(active_segment_sequence_number);
+  header.set_tablet_id(tablet_id);
   if (codec_) {
     header.set_compression_codec(codec_->type());
   }
 
   // Set up the new footer. This will be maintained as the segment is written.
-  footer_builder_.Clear();
-  footer_builder_.set_num_entries(0);
-
+  footer_.Clear();
+  footer_.set_num_entries(0);
 
   // Set the new segment's schema.
   {
@@ -1101,17 +1145,7 @@ Status Log::SwitchToAllocatedSegment() {
     RETURN_NOT_OK(SchemaToPB(schema_, header.mutable_schema()));
     header.set_schema_version(schema_version_);
   }
-
-  RETURN_NOT_OK(new_segment->WriteHeaderAndOpen(header));
-
-  // Transform the currently-active segment into a readable one, since we
-  // need to be able to replay the segments for other peers.
-  {
-    if (active_segment_.get() != nullptr) {
-      std::lock_guard<percpu_rwlock> l(state_lock_);
-      CHECK_OK(ReplaceSegmentInReaderUnlocked());
-    }
-  }
+  RETURN_NOT_OK_PREPEND(new_segment->WriteHeaderAndOpen(header), "Failed to write header");
 
   // Open the segment we just created in readable form and add it to the reader.
   // TODO(todd): consider using a global FileCache here? With short log segments and
@@ -1119,58 +1153,49 @@ Status Log::SwitchToAllocatedSegment() {
   unique_ptr<RandomAccessFile> readable_file;
 
   RandomAccessFileOptions opts;
-  RETURN_NOT_OK(fs_manager_->env()->NewRandomAccessFile(opts, new_segment_path, &readable_file));
+  RETURN_NOT_OK(env->NewRandomAccessFile(opts, new_segment_path, &readable_file));
   scoped_refptr<ReadableLogSegment> readable_segment(
     new ReadableLogSegment(new_segment_path,
                            shared_ptr<RandomAccessFile>(readable_file.release())));
   RETURN_NOT_OK(readable_segment->Init(header, new_segment->first_entry_offset()));
-  RETURN_NOT_OK(reader_->AppendEmptySegment(readable_segment));
+  RETURN_NOT_OK(reader_add_segment_(readable_segment));
 
   // Now set 'active_segment_' to the new segment.
-  active_segment_.reset(new_segment.release());
+  active_segment_ = std::move(new_segment);
 
+  std::lock_guard<RWMutex> l(allocation_lock_);
   allocation_state_ = kAllocationNotStarted;
-
   return Status::OK();
 }
 
-Status Log::ReplaceSegmentInReaderUnlocked() {
-  // We should never switch to a new segment if we wrote nothing to the old one.
+Status SegmentAllocator::GetClosedSegment(scoped_refptr<ReadableLogSegment>* readable_segment) {
   CHECK(active_segment_->IsClosed());
   shared_ptr<RandomAccessFile> readable_file;
-  RETURN_NOT_OK(OpenFileForRandom(fs_manager_->env(), active_segment_->path(), &readable_file));
-  scoped_refptr<ReadableLogSegment> readable_segment(
-      new ReadableLogSegment(active_segment_->path(),
-                             readable_file));
-  // Note: active_segment_->header() will only contain an initialized PB if we
+  RETURN_NOT_OK(
+      OpenFileForRandom(ctx_->fs_manager->env(), active_segment_->path(), &readable_file));
+  scoped_refptr<ReadableLogSegment> segment(
+      new ReadableLogSegment(active_segment_->path(), readable_file));
+  // Note: segment->header() will only contain an initialized PB if we
   // wrote the header out.
-  RETURN_NOT_OK(readable_segment->Init(active_segment_->header(),
-                                       active_segment_->footer(),
-                                       active_segment_->first_entry_offset()));
-
-  return reader_->ReplaceLastSegment(readable_segment);
+  RETURN_NOT_OK(segment->Init(active_segment_->header(),
+                              active_segment_->footer(),
+                              active_segment_->first_entry_offset()));
+  *readable_segment = std::move(segment);
+  return Status::OK();
 }
 
-Status Log::CreatePlaceholderSegment(const WritableFileOptions& opts,
-                                     string* result_path,
-                                     shared_ptr<WritableFile>* out) {
-  string tmp_suffix = strings::Substitute("$0$1", kTmpInfix, ".newsegmentXXXXXX");
-  string path_tmpl = JoinPathSegments(log_dir_, tmp_suffix);
-  VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
-  unique_ptr<WritableFile> segment_file;
-  RETURN_NOT_OK(fs_manager_->env()->NewTempWritableFile(opts,
-                                                        path_tmpl,
-                                                        result_path,
-                                                        &segment_file));
-  VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << *result_path;
-  out->reset(segment_file.release());
-  return Status::OK();
+Status Log::AddEmptySegmentInReader(const scoped_refptr<ReadableLogSegment>& segment) {
+  std::lock_guard<percpu_rwlock> l(state_lock_);
+  return reader_->AppendEmptySegment(segment);
 }
 
-std::string Log::LogPrefix() const {
-  return Substitute("T $0 P $1: ", tablet_id_, fs_manager_->uuid());
+Status Log::ReplaceSegmentInReader(const scoped_refptr<ReadableLogSegment>& segment) {
+  std::lock_guard<percpu_rwlock> l(state_lock_);
+  return reader_->ReplaceLastSegment(segment);
 }
 
+std::string Log::LogPrefix() const { return ctx_.LogPrefix(); }
+
 Log::~Log() {
   WARN_NOT_OK(Close(), "Error closing log");
 }
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 269237b..7fcfcdd 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -19,6 +19,7 @@
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
+#include <functional>
 #include <limits>
 #include <map>
 #include <memory>
@@ -31,6 +32,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/log.pb.h"
+#include "kudu/consensus/log_metrics.h"
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/ref_counted_replicate.h"
@@ -41,59 +43,223 @@
 #include "kudu/util/blocking_queue.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/promise.h"
 #include "kudu/util/rw_mutex.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/status_callback.h"
+#include "kudu/util/threadpool.h"
 
 namespace kudu {
 
 class CompressionCodec;
 class FsManager;
-class MetricEntity;
-class ThreadPool;
 class WritableFile;
-struct WritableFileOptions;
 
 namespace log {
 
-struct LogEntryBatchLogicalSize;
-struct LogMetrics;
-struct RetentionIndexes;
 class LogEntryBatch;
+class LogFaultHooks;
 class LogIndex;
 class LogReader;
+struct LogEntryBatchLogicalSize;
+struct RetentionIndexes;
+
+// Context used by the various classes that operate on the Log.
+struct LogContext {
+  const std::string tablet_id;
+  const std::string log_dir;
+
+  scoped_refptr<MetricEntity> metric_entity;
+  std::unique_ptr<LogMetrics> metrics;
+  FsManager* fs_manager;
+
+  std::string LogPrefix() const;
+};
 
 typedef BlockingQueue<LogEntryBatch*, LogEntryBatchLogicalSize> LogEntryBatchQueue;
 
+// State of segment allocation.
+enum SegmentAllocationState {
+  kAllocationNotStarted, // No segment allocation requested
+  kAllocationInProgress, // Next segment allocation started
+  kAllocationFinished // Next segment ready
+};
+
+// Encapsulates the logic around allocating log segments.
+//
+// Methods of this class are not threadsafe, unless otherwise mentioned.
+// It is expected that segment allocation is driven through a single thread
+// (presumably the append thread, as operations are written).
+class SegmentAllocator {
+ public:
+  // Creates a new SegmentAllocator. The allocator isn't usable until Init() is
+  // called. 'opts' and 'ctx' are options and various context variables that
+  // are relevant for the Log for which we are allocating segments. 'schema'
+  // and 'schema_version' define the initial schema for the Log.
+  // 'reader_replace_last_segment' is a functor that should be called upon
+  // closing a segment to make the segment readable by the Log's reader.
+  // 'reader_add_segment' is a functor that should be called upon initializing
+  // a new segment to make the segment readable by the Log's reader.
+  SegmentAllocator(const LogOptions* opts,
+                   const LogContext* ctx,
+                   Schema schema,
+                   uint32_t schema_version,
+                   std::function<Status(scoped_refptr<ReadableLogSegment>)>
+                       reader_replace_last_segment,
+                   std::function<Status(scoped_refptr<ReadableLogSegment>)>
+                       reader_add_segment);
+
+  // Sets the currently active segment number, starts the threadpool, and
+  // synchronously allocates an active segment.
+  Status Init(uint64_t sequence_number);
+
+  // Checks whether it's time to allocate (e.g. the current segment is full)
+  // and/or roll over (e.g. a previous pre-allocation has finished), and does
+  // so as appropriate.
+  //
+  // 'write_size_bytes' is the expected size of the next write; if the active
+  // segment would go above the max segment size, a new segment is allocated.
+  Status AllocateOrRollOverIfNecessary(uint32_t write_size_bytes);
+
+  // Fsyncs the currently active segment to disk.
+  Status Sync();
+
+  // Syncs the current segment and writes out the footer.
+  Status CloseCurrentSegment();
+
+  // Update the footer based on the written 'batch', e.g. to track the
+  // last-written OpId.
+  void UpdateFooterForBatch(const LogEntryBatch& batch);
+
+  // Shuts down the allocator threadpool. Note that this _doesn't_ close the
+  // current active segment.
+  void StopAllocationThread();
+
+  uint64_t active_segment_sequence_number = 0;
+
+  std::string LogPrefix() const { return ctx_->LogPrefix(); }
+
+ private:
+  friend class Log;
+  friend class LogTest;
+  FRIEND_TEST(LogTest, TestAutoStopIdleAppendThread);
+  FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment);
+  SegmentAllocationState allocation_state() {
+    shared_lock<RWMutex> l(allocation_lock_);
+    return allocation_state_;
+  }
+
+  // This is not thread-safe. It is up to the caller to ensure this does not
+  // interfere with the append thread's attempts to switch log segments.
+  Status AllocateSegmentAndRollOver();
+
+  // Returns a readable segment pointing at the most recently closed segment.
+  Status GetClosedSegment(scoped_refptr<ReadableLogSegment>* readable_segment);
+
+  void SetSchemaForNextSegment(Schema schema, uint32_t version);
+
+  // Schedules a task to allocate a new log segment.
+  // Must be called when the allocation_lock_ is held.
+  Status AsyncAllocateSegmentUnlocked();
+
+  // Task to be put onto the allocation_pool_ that allocates segments.
+  void AllocationTask();
+
+  // Creates a temporary file, populating 'next_segment_file_' and
+  // 'next_segment_path_', and pre-allocating 'max_segment_size_' bytes if
+  // pre-allocation is enabled.
+  Status AllocateNewSegment();
+
+  // Swaps in the next segment file as the new active segment.
+  Status SwitchToAllocatedSegment();
+
+  // Waits for any on-going allocation to complete and rolls over onto the
+  // allocated segment, swapping out the previous active segment (if one
+  // existed).
+  Status RollOver();
+
+  // Function call that replaces the last segment in the Log's log reader with
+  // the input readable log segment.
+  const std::function<Status(scoped_refptr<ReadableLogSegment>)> reader_replace_last_segment_;
+
+  // Function call that adds the input segment to the Log's log reader.
+  const std::function<Status(scoped_refptr<ReadableLogSegment>)> reader_add_segment_;
+
+  // Hooks used to inject faults into the allocator.
+  std::shared_ptr<LogFaultHooks> hooks_;
+
+  // Descriptors for the segment file that should be used as the next active
+  // segment.
+  std::shared_ptr<WritableFile> next_segment_file_;
+  std::string next_segment_path_;
+
+  // Contains state shared by various Log-related classs.
+  const LogOptions* opts_;
+  const LogContext* ctx_;
+
+  // The maximum segment size, in bytes.
+  uint64_t max_segment_size_;
+
+  // The codec used to compress entries, or nullptr if not configured.
+  const CompressionCodec* codec_ = nullptr;
+
+  // The schema and schema version to be used for the next segment.
+  mutable rw_spinlock schema_lock_;
+  Schema schema_;
+  uint32_t schema_version_;
+
+  // Whether fsyncing has been disabled.
+  // This is used to disable fsync during bootstrap.
+  bool sync_disabled_;
+
+  // A footer being prepared for the current segment.
+  // When the segment is closed, it will be written.
+  LogSegmentFooterPB footer_;
+
+  // The currently active segment being written.
+  std::unique_ptr<WritableLogSegment> active_segment_;
+
+  // Protects allocation_state_;
+  mutable RWMutex allocation_lock_;
+  SegmentAllocationState allocation_state_ = kAllocationNotStarted;
+
+  // Single-threaded threadpool on which to allocate segments.
+  std::unique_ptr<ThreadPool> allocation_pool_;
+  Promise<Status> allocation_status_;
+};
+
 // Log interface, inspired by Raft's (logcabin) Log. Provides durability to
 // Kudu as a normal Write Ahead Log and also plays the role of persistent
 // storage for the consensus state machine.
 //
-// Log uses group commit to improve write throughput and latency
-// without compromising ordering and durability guarantees. A single background
-// thread per Log instance is responsible for accumulating pending writes
-// and flushing them to the log.
+// Log uses group commit to improve write throughput and latency without
+// compromising ordering and durability guarantees. A single background thread
+// (in AppendThread) per Log instance is responsible for accumulating pending
+// writes and writing them to disk.
 //
-// This class is thread-safe unless otherwise noted.
+// A separate background thread (in SegmentAllocator) per Log instance is
+// responsible for synchronously allocating or asynchronously pre-allocating
+// segment files as written entries fill up segments.
+//
+// The public interface of this class is thread-safe unless otherwise noted.
 //
 // Note: The Log needs to be Close()d before any log-writing class is
 // destroyed, otherwise the Log might hold references to these classes
 // to execute the callbacks after each write.
 class Log : public RefCountedThreadSafe<Log> {
  public:
-  class LogFaultHooks;
 
   static const Status kLogShutdownStatus;
   static const uint64_t kInitialLogSegmentSequenceNumber;
 
   // Opens or continues a log and sets 'log' to the newly built Log.
   // After a successful Open() the Log is ready to receive entries.
-  static Status Open(const LogOptions &options,
+  static Status Open(LogOptions options,
                      FsManager *fs_manager,
                      const std::string& tablet_id,
-                     const Schema& schema,
+                     Schema schema,
                      uint32_t schema_version,
                      const scoped_refptr<MetricEntity>& metric_entity,
                      scoped_refptr<Log> *log);
@@ -102,6 +268,7 @@ class Log : public RefCountedThreadSafe<Log> {
 
   // Synchronously append a new entry to the log.
   // Log does not take ownership of the passed 'entry'.
+  // This is not thread-safe.
   Status Append(LogEntryPB* entry);
 
   // Append the given set of replicate messages, asynchronously.
@@ -120,14 +287,6 @@ class Log : public RefCountedThreadSafe<Log> {
   // are flushed and fsynced (if fsync of log entries is enabled).
   Status WaitUntilAllFlushed();
 
-  // Kick off an asynchronous task that pre-allocates a new
-  // log-segment, setting 'allocation_status_'. To wait for the
-  // result of the task, use allocation_status_.Get().
-  Status AsyncAllocateSegment();
-
-  // The closure submitted to allocation_pool_ to allocate a new segment.
-  void SegmentAllocationTask();
-
   // Syncs all state and closes the log.
   Status Close();
 
@@ -149,24 +308,26 @@ class Log : public RefCountedThreadSafe<Log> {
   std::shared_ptr<LogReader> reader() const { return reader_; }
 
   void SetMaxSegmentSizeForTests(uint64_t max_segment_size) {
-    max_segment_size_ = max_segment_size;
+    segment_allocator_.max_segment_size_ = max_segment_size;
   }
 
+  // This is not thread-safe.
   void DisableSync() {
-    sync_disabled_ = true;
+    segment_allocator_.sync_disabled_ = true;
   }
 
   // If we previous called DisableSync(), we should restore the
   // default behavior and then call Sync() which will perform the
   // actual syncing if required.
+  // This is not thread-safe.
   Status ReEnableSyncIfRequired() {
-    sync_disabled_ = false;
-    return Sync();
+    segment_allocator_.sync_disabled_ = false;
+    return segment_allocator_.Sync();
   }
 
   // Get ID of tablet.
   const std::string& tablet_id() const {
-    return tablet_id_;
+    return ctx_.tablet_id;
   }
 
   // Runs the garbage collector on the set of previous segments. Segments that
@@ -212,7 +373,7 @@ class Log : public RefCountedThreadSafe<Log> {
 
   // Returns the file system location of the currently active WAL segment.
   const std::string& ActiveSegmentPathForTests() const {
-    return active_segment_->path();
+    return segment_allocator_.active_segment_->path();
   }
 
   // Return true if the append thread is currently active.
@@ -225,20 +386,18 @@ class Log : public RefCountedThreadSafe<Log> {
   // This is not thread-safe. Used in test only.
   Status AllocateSegmentAndRollOver();
 
-  // Returns this Log's FsManager.
-  FsManager* GetFsManager();
-
-  void SetLogFaultHooksForTests(const std::shared_ptr<LogFaultHooks> &hooks) {
-    log_hooks_ = hooks;
+  void SetLogFaultHooksForTests(const std::shared_ptr<LogFaultHooks>& hooks) {
+    segment_allocator_.hooks_ = hooks;
   }
 
   // Set the schema for the _next_ log segment.
   //
   // This method is thread-safe.
-  void SetSchemaForNextLogSegment(const Schema& schema, uint32_t version);
+  void SetSchemaForNextLogSegment(Schema schema, uint32_t version);
  private:
   friend class LogTest;
   friend class LogTestBase;
+  friend class SegmentAllocator;
   FRIEND_TEST(LogTestOptionalCompression, TestMultipleEntriesInABatch);
   FRIEND_TEST(LogTestOptionalCompression, TestReadLogWithReplacedReplicates);
   FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment);
@@ -253,23 +412,11 @@ class Log : public RefCountedThreadSafe<Log> {
     kLogClosed
   };
 
-  // State of segment (pre-) allocation.
-  enum SegmentAllocationState {
-    kAllocationNotStarted, // No segment allocation requested
-    kAllocationInProgress, // Next segment allocation started
-    kAllocationFinished // Next segment ready
-  };
-
-  Log(LogOptions options, FsManager* fs_manager, std::string log_path,
-      std::string tablet_id, const Schema& schema, uint32_t schema_version,
-      scoped_refptr<MetricEntity> metric_entity);
+  Log(LogOptions options, LogContext ctx, Schema schema, uint32_t schema_version);
 
   // Initializes a new one or continues an existing log.
   Status Init();
 
-  // Make segments roll over.
-  Status RollOver();
-
   // Sets that the current active segment is idle.
   void SetActiveSegmentIdle();
 
@@ -282,26 +429,7 @@ class Log : public RefCountedThreadSafe<Log> {
   Status AsyncAppend(std::unique_ptr<LogEntryBatch> entry_batch,
                      const StatusCallback& callback);
 
-  // Writes the footer and closes the current segment.
-  Status CloseCurrentSegment();
-
-  // Sets 'out' to a newly created temporary file (see
-  // Env::NewTempWritableFile()) for a placeholder segment. Sets
-  // 'result_path' to the fully qualified path to the unique filename
-  // created for the segment.
-  Status CreatePlaceholderSegment(const WritableFileOptions& opts,
-                                  std::string* result_path,
-                                  std::shared_ptr<WritableFile>* out);
-
-  // Creates a new WAL segment on disk, writes the next_segment_header_ to
-  // disk as the header, and sets active_segment_ to point to this new segment.
-  Status SwitchToAllocatedSegment();
-
-  // Preallocates the space for a new segment.
-  Status PreAllocateNewSegment();
-
-  // Writes serialized contents of 'entry' to the log. Called inside
-  // AppenderThread.
+  // Writes serialized contents of 'entry' to the log. This is not thread-safe.
   Status WriteBatch(LogEntryBatch* entry_batch);
 
   // Update footer_builder_ to reflect the log indexes seen in 'batch'.
@@ -314,8 +442,11 @@ class Log : public RefCountedThreadSafe<Log> {
                              int64_t start_offset);
 
   // Replaces the last "empty" segment in 'log_reader_', i.e. the one currently
-  // being written to, by the same segment once properly closed.
-  Status ReplaceSegmentInReaderUnlocked();
+  // being written to, with the same segment once properly closed.
+  Status ReplaceSegmentInReader(const scoped_refptr<ReadableLogSegment>& segment);
+
+  // Adds the given segment to 'log_reader_'.
+  Status AddEmptySegmentInReader(const scoped_refptr<ReadableLogSegment>& segment);
 
   Status Sync();
 
@@ -327,43 +458,16 @@ class Log : public RefCountedThreadSafe<Log> {
     return &entry_batch_queue_;
   }
 
-  const SegmentAllocationState allocation_state() {
-    shared_lock<RWMutex> l(allocation_lock_);
-    return allocation_state_;
-  }
-
   std::string LogPrefix() const;
 
   LogOptions options_;
-  FsManager *fs_manager_;
+  LogContext ctx_;
   std::string log_dir_;
 
-  // The ID of the tablet this log is dedicated to.
-  std::string tablet_id_;
-
-  // Lock to protect modifications to schema_ and schema_version_.
-  mutable rw_spinlock schema_lock_;
-
-  // The current schema of the tablet this log is dedicated to.
-  Schema schema_;
-  // The schema version
-  uint32_t schema_version_;
-
-  // The currently active segment being written.
-  std::unique_ptr<WritableLogSegment> active_segment_;
-
-  // The current (active) segment sequence number.
-  uint64_t active_segment_sequence_number_;
-
-  // The writable file for the next allocated segment
-  std::shared_ptr<WritableFile> next_segment_file_;
-
-  // The path for the next allocated segment.
-  std::string next_segment_path_;
-
   // Lock to protect mutations to log_state_ and other shared state variables.
+  // Generally this is used to ensure adding and removing segments from the log
+  // reader is threadsafe.
   mutable percpu_rwlock state_lock_;
-
   LogState log_state_;
 
   // A reader for the previous segments that were not yet GC'd.
@@ -375,49 +479,23 @@ class Log : public RefCountedThreadSafe<Log> {
   // of the operation in the log.
   scoped_refptr<LogIndex> log_index_;
 
-  // A footer being prepared for the current segment.
-  // When the segment is closed, it will be written.
-  LogSegmentFooterPB footer_builder_;
-
   // The maximum segment size, in bytes.
   uint64_t max_segment_size_;
 
-  // The queue used to communicate between the threads appending operations
-  // and the thread which actually appends them to the log.
+  // The queue used to communicate between the threads appending operations to
+  // the log and the thread which actually writing the operations them to disk.
   LogEntryBatchQueue entry_batch_queue_;
 
   // Thread writing to the log
-  gscoped_ptr<AppendThread> append_thread_;
-
-  std::unique_ptr<ThreadPool> allocation_pool_;
+  std::unique_ptr<AppendThread> append_thread_;
 
-  // If true, sync on all appends.
-  bool force_sync_all_;
-
-  // If true, ignore the 'force_sync_all_' flag above.
-  // This is used to disable fsync during bootstrap.
-  bool sync_disabled_;
-
-  // The status of the most recent log-allocation action.
-  Promise<Status> allocation_status_;
+  SegmentAllocator segment_allocator_;
 
   // Protects the active segment as it is going idle, in case other threads
   // attempt to switch segments concurrently. This shouldn't happen in
   // production, but may happen if AllocateSegmentAndRollOver() is called.
   mutable rw_spinlock segment_idle_lock_;
 
-  // Read-write lock to protect 'allocation_state_'.
-  mutable RWMutex allocation_lock_;
-  SegmentAllocationState allocation_state_;
-
-  // The codec used to compress entries, or nullptr if not configured.
-  const CompressionCodec* codec_;
-
-  scoped_refptr<MetricEntity> metric_entity_;
-  gscoped_ptr<LogMetrics> metrics_;
-
-  std::shared_ptr<LogFaultHooks> log_hooks_;
-
   // The cached on-disk size of the log, used to track its size even if it has been closed.
   std::atomic<int64_t> on_disk_size_;
 
@@ -465,6 +543,7 @@ class LogEntryBatch {
   friend class Log;
   friend struct LogEntryBatchLogicalSize;
   friend class MultiThreadedLogTest;
+  friend class SegmentAllocator;
 
   LogEntryBatch(LogEntryTypePB type,
                 std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
@@ -548,7 +627,7 @@ struct LogEntryBatchLogicalSize {
   }
 };
 
-class Log::LogFaultHooks {
+class LogFaultHooks {
  public:
 
   // Executed immediately before returning from Log::Sync() at *ALL*
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 5746a53..8cf55ea 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1482,7 +1482,7 @@ Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/,
   if (!tx_state.error()) {
     // If the alter completed successfully, update the log segment header. Note
     // that our new log isn't hooked up to the tablet yet.
-    log_->SetSchemaForNextLogSegment(schema, tx_state.schema_version());
+    log_->SetSchemaForNextLogSegment(std::move(schema), tx_state.schema_version());
   }
 
   return AppendCommitMsg(commit_msg);
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 9abf3d8..467c74d 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -3658,7 +3658,7 @@ TEST_F(TabletServerTest, TestChecksumScan) {
   ASSERT_FALSE(resp.has_more_results());
 }
 
-class DelayFsyncLogHook : public log::Log::LogFaultHooks {
+class DelayFsyncLogHook : public log::LogFaultHooks {
  public:
   DelayFsyncLogHook() : log_latch1_(1), test_latch1_(1) {}
 


Mime
View raw message