kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] 02/03: log: use RWFile in log segments
Date Fri, 20 Dec 2019 19:41:20 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 702e33e89d873f4b1ab22ac8b9957da39766ffd7
Author: Adar Dembo <adar@cloudera.com>
AuthorDate: Wed Dec 18 20:49:42 2019 -0800

    log: use RWFile in log segments
    
    I'm working towards the eventual goal of caching log segments in the file
    cache, but for that to work, the segments' use of file handles must be safe
    for caching. Currently they're not:
    1. WritableLogSegment uses WritableFile, which has mutable in-memory state.
    2. The "active" segment in the log has both a writable and readable log
       segment open at the same time, each of which uses a different file handle
       implementation. The file cache could allow that, but it's complexity that
       I'd rather avoid.
    3. The log expects the "active" segment to be truncated and sync'ed when
       closed, but that's not safe in a world where the segment may drop out of
       the cache and get closed arbitrarily.
    
    We can fix all of these issues by converting log segments to use RWFile.
    It's not "pure" (especially for ReadableLogSegment which doesn't write), but
    it gets the job done.
    
    There are some notable semantic changes:
    1. The log must explicitly truncate/sync when finishing the active segment.
    2. Because both kinds of segments now use the same file handle abstraction,
       we no longer need to close and reopen the underlying file handle when
       finishing the active segment; we can just share the existing file handle.
    3. Likewise, when opening a ReadableLogSegment for the just-switched-to
       active segment, we can reuse the file handle from the WritableLogSegment
       rather than opening a second file handle.
    
    Change-Id: I65b39e219e76876df16e698211eb558ab31329c8
    Reviewed-on: http://gerrit.cloudera.org:8080/14931
    Reviewed-by: Andrew Wong <awong@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/consensus/log-test.cc                 |  14 ++--
 src/kudu/consensus/log.cc                      | 111 +++++++++++++------------
 src/kudu/consensus/log.h                       |  31 ++++---
 src/kudu/consensus/log_util.cc                 |  58 +++++++------
 src/kudu/consensus/log_util.h                  |  60 +++++++------
 src/kudu/tserver/tablet_copy_service-test.cc   |   2 +-
 src/kudu/tserver/tablet_copy_source_session.cc |   6 +-
 src/kudu/tserver/tablet_copy_source_session.h  |  16 ++--
 8 files changed, 147 insertions(+), 151 deletions(-)

diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index 2bf9ad9..b50c5d2 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -129,11 +129,13 @@ class LogTest : public LogTestBase {
     string fqp = GetTestPath(strings::Substitute("wal-00000000$0", sequence_number));
     unique_ptr<WritableFile> w_log_seg;
     RETURN_NOT_OK(fs_manager_->env()->NewWritableFile(fqp, &w_log_seg));
-    unique_ptr<RandomAccessFile> r_log_seg;
-    RETURN_NOT_OK(fs_manager_->env()->NewRandomAccessFile(fqp, &r_log_seg));
+    unique_ptr<RWFile> r_log_seg;
+    RWFileOptions opts;
+    opts.mode = Env::MUST_EXIST;
+    RETURN_NOT_OK(fs_manager_->env()->NewRWFile(opts, fqp, &r_log_seg));
 
     scoped_refptr<ReadableLogSegment> readable_segment(
-        new ReadableLogSegment(fqp, shared_ptr<RandomAccessFile>(r_log_seg.release())));
+        new ReadableLogSegment(fqp, shared_ptr<RWFile>(r_log_seg.release())));
 
     LogSegmentHeaderPB header;
     header.set_sequence_number(sequence_number);
@@ -322,9 +324,11 @@ void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition
place,
   OpId op_id = MakeOpId(1, 1);
   ASSERT_OK(AppendNoOps(&op_id, kNumEntries));
 
-  // Find the entry that we want to corrupt before closing the log.
+  // Find the entry that we want to corrupt and get the active segment path
+  // before closing the log; both will be invalid after.
   LogIndexEntry entry;
   ASSERT_OK(log_->log_index_->GetEntry(4, &entry));
+  string active_segment_path = log_->ActiveSegmentPathForTests();
 
   ASSERT_OK(log_->Close());
 
@@ -340,7 +344,7 @@ void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition
place,
     default:
       LOG(FATAL) << "unreachable";
   }
-  ASSERT_OK(CorruptLogFile(env_, log_->ActiveSegmentPathForTests(), type, offset));
+  ASSERT_OK(CorruptLogFile(env_, active_segment_path, type, offset));
 
   // Open a new reader -- we don't reuse the existing LogReader from log_
   // because it has a cached header.
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 8640b3b..b2b8000 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -461,22 +461,22 @@ Status SegmentAllocator::Init(
       .set_max_threads(1)
       .Build(&allocation_pool_));
 
-  scoped_refptr<ReadableLogSegment> closed_segment;
-  RETURN_NOT_OK(AllocateSegmentAndRollOver(&closed_segment, new_readable_segment));
-  DCHECK(!closed_segment); // There was no previously active segment.
+  scoped_refptr<ReadableLogSegment> finished_segment;
+  RETURN_NOT_OK(AllocateSegmentAndRollOver(&finished_segment, new_readable_segment));
+  DCHECK(!finished_segment); // There was no previously active segment.
   return Status::OK();
 }
 
 Status SegmentAllocator::AllocateOrRollOverIfNecessary(
     uint32_t write_size_bytes,
-    scoped_refptr<ReadableLogSegment>* closed_segment,
+    scoped_refptr<ReadableLogSegment>* finished_segment,
     scoped_refptr<ReadableLogSegment>* new_readable_segment) {
   bool should_rollover = false;
   // if the size of this entry overflows the current segment, get a new one
   {
     std::lock_guard<RWMutex> l(allocation_lock_);
     if (allocation_state_ == kAllocationNotStarted) {
-      if ((active_segment_->Size() + write_size_bytes + 4) > max_segment_size_) {
+      if ((active_segment_->written_offset() + write_size_bytes + 4) > max_segment_size_)
{
         VLOG_WITH_PREFIX(1) << "Max segment size reached. Starting new segment allocation";
         RETURN_NOT_OK(AsyncAllocateSegmentUnlocked());
         if (!opts_->async_preallocate_segments) {
@@ -493,7 +493,7 @@ Status SegmentAllocator::AllocateOrRollOverIfNecessary(
   if (should_rollover) {
     TRACE_COUNTER_SCOPE_LATENCY_US("log_roll");
     LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix()))
{
-      RETURN_NOT_OK(RollOver(closed_segment, new_readable_segment));
+      RETURN_NOT_OK(RollOver(finished_segment, new_readable_segment));
     }
   }
   return Status::OK();
@@ -531,12 +531,11 @@ Status SegmentAllocator::Sync() {
   return Status::OK();
 }
 
-Status SegmentAllocator::CloseCurrentSegment(
-    scoped_refptr<ReadableLogSegment>* closed_segment) {
+Status SegmentAllocator::FinishCurrentSegment(
+    scoped_refptr<ReadableLogSegment>* finished_segment) {
   if (hooks_) {
     RETURN_NOT_OK_PREPEND(hooks_->PreClose(), "PreClose hook failed");
   }
-  RETURN_NOT_OK(Sync());
   if (!footer_.has_min_replicate_index()) {
     VLOG_WITH_PREFIX(1) << "Writing a segment without any REPLICATE message. Segment:
"
                         << active_segment_->path();
@@ -545,13 +544,35 @@ Status SegmentAllocator::CloseCurrentSegment(
                       << ": " << pb_util::SecureShortDebugString(footer_);
 
   footer_.set_close_timestamp_micros(GetCurrentTimeMicros());
-  RETURN_NOT_OK(active_segment_->WriteFooterAndClose(footer_));
+  RETURN_NOT_OK(active_segment_->WriteFooter(footer_));
+
+  // max_segment_size_ defines the (soft) limit of a segment. When preallocation
+  // is enabled, max_segment_size also defines the amount of space that is
+  // preallocated at segment creation time.
+  //
+  // We finish a segment when the next write would exceed max_segment_size_, at
+  // which point all of the segment's preallocated space has been consumed. In
+  // some cases (e.g. Log::Close), a segment may be finished prematurely. If we
+  // detect that, let's return any excess preallocated space back to the
+  // filesystem by truncating off the end of the segment.
+  if (opts_->preallocate_segments &&
+      active_segment_->written_offset() < max_segment_size_) {
+    RETURN_NOT_OK(active_segment_->file()->Truncate(
+        active_segment_->written_offset()));
+  }
+  RETURN_NOT_OK(Sync());
+
   if (hooks_) {
     RETURN_NOT_OK_PREPEND(hooks_->PostClose(), "PostClose hook failed");
   }
 
-  if (closed_segment) {
-    RETURN_NOT_OK(GetClosedSegment(closed_segment));
+  if (finished_segment) {
+    scoped_refptr<ReadableLogSegment> segment(
+        new ReadableLogSegment(active_segment_->path(), active_segment_->file()));
+    RETURN_NOT_OK(segment->Init(active_segment_->header(),
+                                active_segment_->footer(),
+                                active_segment_->first_entry_offset()));
+    *finished_segment = std::move(segment);
   }
 
   return Status::OK();
@@ -578,29 +599,13 @@ void SegmentAllocator::StopAllocationThread() {
 }
 
 Status SegmentAllocator::AllocateSegmentAndRollOver(
-    scoped_refptr<ReadableLogSegment>* closed_segment,
+    scoped_refptr<ReadableLogSegment>* finished_segment,
     scoped_refptr<ReadableLogSegment>* new_readable_segment) {
   {
     std::lock_guard<RWMutex> l(allocation_lock_);
     RETURN_NOT_OK(AsyncAllocateSegmentUnlocked());
   }
-  return RollOver(closed_segment, new_readable_segment);
-}
-
-Status SegmentAllocator::GetClosedSegment(scoped_refptr<ReadableLogSegment>* readable_segment)
{
-  CHECK(active_segment_->IsClosed());
-  shared_ptr<RandomAccessFile> readable_file;
-  RETURN_NOT_OK(
-      OpenFileForRandom(ctx_->fs_manager->env(), active_segment_->path(), &readable_file));
-  scoped_refptr<ReadableLogSegment> segment(
-      new ReadableLogSegment(active_segment_->path(), readable_file));
-  // Note: segment->header() will only contain an initialized PB if we
-  // wrote the header out.
-  RETURN_NOT_OK(segment->Init(active_segment_->header(),
-                              active_segment_->footer(),
-                              active_segment_->first_entry_offset()));
-  *readable_segment = std::move(segment);
-  return Status::OK();
+  return RollOver(finished_segment, new_readable_segment);
 }
 
 void SegmentAllocator::SetSchemaForNextSegment(Schema schema,
@@ -635,15 +640,13 @@ Status SegmentAllocator::AllocateNewSegment() {
     allocation_state_ = kAllocationFinished;
   });
 
-  WritableFileOptions opts;
-  opts.sync_on_close = opts_->force_fsync_all;
   string tmp_suffix = Substitute("$0$1", kTmpInfix, ".newsegmentXXXXXX");
   string path_tmpl = JoinPathSegments(ctx_->log_dir, tmp_suffix);
   VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: "
<< path_tmpl;
-  unique_ptr<WritableFile> segment_file;
+  unique_ptr<RWFile> segment_file;
   Env* env = ctx_->fs_manager->env();
-  RETURN_NOT_OK_PREPEND(env->NewTempWritableFile(
-      opts, path_tmpl, &next_segment_path_, &segment_file),
+  RETURN_NOT_OK_PREPEND(env->NewTempRWFile(
+      RWFileOptions(), path_tmpl, &next_segment_path_, &segment_file),
                         "could not create next WAL segment");
   next_segment_file_.reset(segment_file.release());
   VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << next_segment_path_;
@@ -658,7 +661,8 @@ Status SegmentAllocator::AllocateNewSegment() {
                                                       FLAGS_fs_wal_dir_reserved_bytes));
     // TODO (perf) zero the new segments -- this could result in
     // additional performance improvements.
-    RETURN_NOT_OK_PREPEND(next_segment_file_->PreAllocate(max_segment_size_),
+    RETURN_NOT_OK_PREPEND(next_segment_file_->PreAllocate(
+        0, max_segment_size_, RWFile::CHANGE_FILE_SIZE),
                           "could not preallocate next WAL segment");
   }
   return Status::OK();
@@ -680,7 +684,7 @@ Status SegmentAllocator::SwitchToAllocatedSegment(
 
   // Create a new segment in memory.
   unique_ptr<WritableLogSegment> new_segment(
-      new WritableLogSegment(new_segment_path, std::move(next_segment_file_)));
+      new WritableLogSegment(new_segment_path, next_segment_file_));
 
   // Set up the new header and footer.
   LogSegmentHeaderPB header;
@@ -700,7 +704,7 @@ Status SegmentAllocator::SwitchToAllocatedSegment(
     RETURN_NOT_OK(SchemaToPB(schema_, header.mutable_schema()));
     header.set_schema_version(schema_version_);
   }
-  RETURN_NOT_OK_PREPEND(new_segment->WriteHeaderAndOpen(header), "Failed to write header");
+  RETURN_NOT_OK_PREPEND(new_segment->WriteHeader(header), "Failed to write header");
 
   // Open the segment we just created in readable form; it is the caller's
   // responsibility to add it to the reader.
@@ -708,13 +712,8 @@ Status SegmentAllocator::SwitchToAllocatedSegment(
   // TODO(todd): consider using a global FileCache here? With short log segments and
   // lots of tablets, this file descriptor usage may add up.
   {
-    unique_ptr<RandomAccessFile> readable_file;
-
-    RandomAccessFileOptions opts;
-    RETURN_NOT_OK(env->NewRandomAccessFile(opts, new_segment_path, &readable_file));
     scoped_refptr<ReadableLogSegment> readable_segment(
-        new ReadableLogSegment(new_segment_path,
-                               shared_ptr<RandomAccessFile>(readable_file.release())));
+        new ReadableLogSegment(new_segment_path, std::move(next_segment_file_)));
     RETURN_NOT_OK(readable_segment->Init(header, new_segment->first_entry_offset()));
     *new_readable_segment = std::move(readable_segment);
   }
@@ -728,7 +727,7 @@ Status SegmentAllocator::SwitchToAllocatedSegment(
 }
 
 Status SegmentAllocator::RollOver(
-    scoped_refptr<ReadableLogSegment>* closed_segment,
+    scoped_refptr<ReadableLogSegment>* finished_segment,
     scoped_refptr<ReadableLogSegment>* new_readable_segment) {
   SCOPED_LATENCY_METRIC(ctx_->metrics, roll_latency);
 
@@ -739,7 +738,7 @@ Status SegmentAllocator::RollOver(
   // If this isn't the first active segment, close it and return a reopened
   // segment reader so that the caller can update its log reader.
   if (active_segment_) {
-    RETURN_NOT_OK(CloseCurrentSegment(closed_segment));
+    RETURN_NOT_OK(FinishCurrentSegment(finished_segment));
   }
   RETURN_NOT_OK(SwitchToAllocatedSegment(new_readable_segment));
 
@@ -893,13 +892,13 @@ Status Log::WriteBatch(LogEntryBatch* entry_batch) {
     return Status::OK();
   }
 
-  scoped_refptr<ReadableLogSegment> closed_segment;
+  scoped_refptr<ReadableLogSegment> finished_segment;
   scoped_refptr<ReadableLogSegment> new_readable_segment;
   RETURN_NOT_OK(segment_allocator_.AllocateOrRollOverIfNecessary(
-      entry_batch_bytes, &closed_segment, &new_readable_segment));
-  if (closed_segment) {
+      entry_batch_bytes, &finished_segment, &new_readable_segment));
+  if (finished_segment) {
     // Must be done before a new segment is appended.
-    reader_->ReplaceLastSegment(std::move(closed_segment));
+    reader_->ReplaceLastSegment(std::move(finished_segment));
   }
   if (new_readable_segment) {
     reader_->AppendEmptySegment(std::move(new_readable_segment));
@@ -950,12 +949,12 @@ Status Log::UpdateIndexForBatch(const LogEntryBatch& batch,
 
 Status Log::AllocateSegmentAndRollOverForTests() {
   std::lock_guard<rw_spinlock> l(segment_idle_lock_);
-  scoped_refptr<ReadableLogSegment> closed_segment;
+  scoped_refptr<ReadableLogSegment> finished_segment;
   scoped_refptr<ReadableLogSegment> new_readable_segment;
   RETURN_NOT_OK(segment_allocator_.AllocateSegmentAndRollOver(
-      &closed_segment, &new_readable_segment));
-  if (closed_segment) {
-    reader_->ReplaceLastSegment(std::move(closed_segment));
+      &finished_segment, &new_readable_segment));
+  if (finished_segment) {
+    reader_->ReplaceLastSegment(std::move(finished_segment));
   }
   reader_->AppendEmptySegment(std::move(new_readable_segment));
   return Status::OK();
@@ -1153,10 +1152,12 @@ Status Log::Close() {
     }
   }
 
-  RETURN_NOT_OK(segment_allocator_.CloseCurrentSegment(/*closed_segment=*/ nullptr));
+  RETURN_NOT_OK(segment_allocator_.FinishCurrentSegment(
+      /*finished_segment=*/ nullptr));
   VLOG_WITH_PREFIX(1) << "Log closed";
 
   // Release FDs held by these objects.
+  segment_allocator_.active_segment_.reset();
   log_index_.reset();
   reader_.reset();
   return Status::OK();
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index a0c7540..b16d074 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -55,7 +55,7 @@ namespace kudu {
 
 class CompressionCodec;
 class FsManager;
-class WritableFile;
+class RWFile;
 
 namespace log {
 
@@ -120,12 +120,12 @@ class SegmentAllocator {
   // 'write_size_bytes' is the expected size of the next write; if the active
   // segment would go above the max segment size, a new segment is allocated.
   //
-  // In the event of a roll over, 'closed_segment' contains a new segment reader
-  // for the just-closed segment (if there was one), and 'new_readable_segment'
-  // contains the newly active segment, reopened for reading.
+  // In the event of a roll over, 'finished_segment' contains a new segment
+  // reader for the just-finished segment (if there was one), and
+  // 'new_readable_segment' contains the newly active segment, reopened for reading.
   Status AllocateOrRollOverIfNecessary(
       uint32_t write_size_bytes,
-      scoped_refptr<ReadableLogSegment>* closed_segment,
+      scoped_refptr<ReadableLogSegment>* finished_segment,
       scoped_refptr<ReadableLogSegment>* new_readable_segment);
 
 
@@ -134,9 +134,9 @@ class SegmentAllocator {
 
   // Syncs the current segment and writes out the footer.
   //
-  // If 'closed_segment' is not null, it will contain a new ReadableLogSegment
-  // corresponding to the segment that was just closed.
-  Status CloseCurrentSegment(scoped_refptr<ReadableLogSegment>* closed_segment);
+  // If 'finished_segment' is not null, it will contain a new ReadableLogSegment
+  // corresponding to the segment that was just finished.
+  Status FinishCurrentSegment(scoped_refptr<ReadableLogSegment>* finished_segment);
 
   // Update the footer based on the written 'batch', e.g. to track the
   // last-written OpId.
@@ -165,17 +165,14 @@ class SegmentAllocator {
   // This is not thread-safe. It is up to the caller to ensure this does not
   // interfere with the append thread's attempts to switch log segments.
   //
-  // If there was a previous active segment, 'closed_segment' contains a new
+  // If there was a previous active segment, 'finished_segment' contains a new
   // segment reader built for that segment.
   //
   // 'new_readable_segment' contains the newly active segment, reopened for reading.
   Status AllocateSegmentAndRollOver(
-      scoped_refptr<ReadableLogSegment>* closed_segment,
+      scoped_refptr<ReadableLogSegment>* finished_segment,
       scoped_refptr<ReadableLogSegment>* new_readable_segment);
 
-  // Returns a readable segment pointing at the most recently closed segment.
-  Status GetClosedSegment(scoped_refptr<ReadableLogSegment>* readable_segment);
-
   // Sets the schema and version to be used for the next allocated segment.
   void SetSchemaForNextSegment(Schema schema, uint32_t version);
 
@@ -200,11 +197,11 @@ class SegmentAllocator {
   // Waits for any on-going allocation to complete and rolls over onto the
   // allocated segment, swapping out the previous active segment if it existed.
   //
-  // If there was a previous active segment, 'closed_segment' contains a new
+  // If there was a previous active segment, 'finished_segment' contains a new
   // segment reader built for that segment.
   //
   // 'new_readable_segment' contains the newly active segment, reopened for reading.
-  Status RollOver(scoped_refptr<ReadableLogSegment>* closed_segment,
+  Status RollOver(scoped_refptr<ReadableLogSegment>* finished_segment,
                   scoped_refptr<ReadableLogSegment>* new_readable_segment);
 
   // Hooks used to inject faults into the allocator.
@@ -212,7 +209,7 @@ class SegmentAllocator {
 
   // Descriptors for the segment file that should be used as the next active
   // segment.
-  std::shared_ptr<WritableFile> next_segment_file_;
+  std::shared_ptr<RWFile> next_segment_file_;
   std::string next_segment_path_;
 
   // Contains state shared by various Log-related classs.
@@ -235,7 +232,7 @@ class SegmentAllocator {
   bool sync_disabled_;
 
   // A footer being prepared for the current segment.
-  // When the segment is closed, it will be written.
+  // When the segment is finished, it will be written.
   LogSegmentFooterPB footer_;
 
   // The currently active segment being written.
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 0ea8b33..9783630 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -41,7 +41,6 @@
 #include "kudu/util/compression/compression_codec.h"
 #include "kudu/util/crc.h"
 #include "kudu/util/debug/trace_event.h"
-#include "kudu/util/env_util.h"
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
@@ -262,21 +261,23 @@ Status ReadableLogSegment::Open(Env* env,
                                 const string& path,
                                 scoped_refptr<ReadableLogSegment>* segment) {
   VLOG(1) << "Parsing wal segment: " << path;
-  shared_ptr<RandomAccessFile> readable_file;
-  RETURN_NOT_OK_PREPEND(env_util::OpenFileForRandom(env, path, &readable_file),
+  unique_ptr<RWFile> file;
+  RWFileOptions opts;
+  opts.mode = Env::MUST_EXIST;
+  RETURN_NOT_OK_PREPEND(env->NewRWFile(opts, path, &file),
                         "Unable to open file for reading");
 
-  segment->reset(new ReadableLogSegment(path, readable_file));
+  segment->reset(new ReadableLogSegment(path, shared_ptr<RWFile>(file.release())));
   RETURN_NOT_OK_PREPEND((*segment)->Init(), "Unable to initialize segment");
   return Status::OK();
 }
 
 ReadableLogSegment::ReadableLogSegment(
-    std::string path, shared_ptr<RandomAccessFile> readable_file)
+    string path, shared_ptr<RWFile> file)
     : path_(std::move(path)),
       file_size_(0),
       readable_to_offset_(0),
-      readable_file_(std::move(readable_file)),
+      file_(std::move(file)),
       codec_(nullptr),
       is_initialized_(false),
       footer_was_rebuilt_(false) {}
@@ -355,7 +356,7 @@ Status ReadableLogSegment::InitCompressionCodec() {
   return Status::OK();
 }
 
-const int64_t ReadableLogSegment::readable_up_to() const {
+int64_t ReadableLogSegment::readable_up_to() const {
   return readable_to_offset_.Load();
 }
 
@@ -403,7 +404,7 @@ Status ReadableLogSegment::ReadFileSize() {
   // Env uses uint here, even though we generally prefer signed ints to avoid
   // underflow bugs. Use a local to convert.
   uint64_t size;
-  RETURN_NOT_OK_PREPEND(readable_file_->Size(&size), "Unable to read file size");
+  RETURN_NOT_OK_PREPEND(file_->Size(&size), "Unable to read file size");
   file_size_.Store(size);
   if (size == 0) {
     VLOG(1) << "Log segment file $0 is zero-length: " << path();
@@ -428,8 +429,8 @@ Status ReadableLogSegment::ReadHeader() {
   LogSegmentHeaderPB header;
 
   // Read and parse the log segment header.
-  RETURN_NOT_OK_PREPEND(readable_file_->Read(kLogSegmentHeaderMagicAndHeaderLength,
-                                             header_slice),
+  RETURN_NOT_OK_PREPEND(file_->Read(kLogSegmentHeaderMagicAndHeaderLength,
+                                    header_slice),
                         "Unable to read fully");
 
   RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(&header,
@@ -452,7 +453,7 @@ Status ReadableLogSegment::ReadHeader() {
 Status ReadableLogSegment::ReadHeaderMagicAndHeaderLength(uint32_t *len) const {
   uint8_t scratch[kLogSegmentHeaderMagicAndHeaderLength];
   Slice slice(scratch, kLogSegmentHeaderMagicAndHeaderLength);
-  RETURN_NOT_OK(readable_file_->Read(0, slice));
+  RETURN_NOT_OK(file_->Read(0, slice));
   RETURN_NOT_OK(ParseHeaderMagicAndHeaderLength(slice, len));
   return Status::OK();
 }
@@ -510,7 +511,7 @@ Status ReadableLogSegment::ReadFooter() {
   LogSegmentFooterPB footer;
 
   // Read and parse the log segment footer.
-  RETURN_NOT_OK_PREPEND(readable_file_->Read(footer_offset, footer_slice),
+  RETURN_NOT_OK_PREPEND(file_->Read(footer_offset, footer_slice),
                         "Footer not found. Could not read fully.");
 
   RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(&footer,
@@ -527,8 +528,8 @@ Status ReadableLogSegment::ReadFooterMagicAndFooterLength(uint32_t *len)
const {
   Slice slice(scratch, kLogSegmentFooterMagicAndFooterLength);
 
   CHECK_GT(file_size(), kLogSegmentFooterMagicAndFooterLength);
-  RETURN_NOT_OK(readable_file_->Read(file_size() - kLogSegmentFooterMagicAndFooterLength,
-                                     slice));
+  RETURN_NOT_OK(file_->Read(file_size() - kLogSegmentFooterMagicAndFooterLength,
+                            slice));
 
   RETURN_NOT_OK(ParseFooterMagicAndFooterLength(slice, len));
   return Status::OK();
@@ -590,7 +591,7 @@ Status ReadableLogSegment::ScanForValidEntryHeaders(
        offset += kChunkSize - entry_header_size()) {
     int rem = std::min<int64_t>(file_size() - offset, kChunkSize);
     Slice chunk(buf.get(), rem);
-    RETURN_NOT_OK(readable_file()->Read(offset, chunk));
+    RETURN_NOT_OK(file_->Read(offset, chunk));
 
     // Optimization for the case where a chunk is all zeros -- this is common in the
     // case of pre-allocated files. This avoids a lot of redundant CRC calculation.
@@ -639,7 +640,7 @@ Status ReadableLogSegment::ReadEntryHeader(int64_t *offset, EntryHeader*
header,
   const size_t header_size = entry_header_size();
   uint8_t scratch[header_size];
   Slice slice(scratch, header_size);
-  RETURN_NOT_OK_PREPEND(readable_file()->Read(*offset, slice),
+  RETURN_NOT_OK_PREPEND(file_->Read(*offset, slice),
                         "Could not read log entry header");
 
   *status_detail = DecodeEntryHeader(slice, header);
@@ -717,7 +718,7 @@ Status ReadableLogSegment::ReadEntryBatch(int64_t* offset,
   }
   tmp_buf->resize(buf_len);
   Slice entry_batch_slice(tmp_buf->data(), header.msg_length_compressed);
-  Status s =  readable_file()->Read(*offset, entry_batch_slice);
+  Status s =  file_->Read(*offset, entry_batch_slice);
 
   if (!s.ok()) return Status::IOError(Substitute("Could not read entry. Cause: $0",
                                                  s.ToString()));
@@ -755,14 +756,14 @@ Status ReadableLogSegment::ReadEntryBatch(int64_t* offset,
 }
 
 WritableLogSegment::WritableLogSegment(string path,
-                                       shared_ptr<WritableFile> writable_file)
+                                       shared_ptr<RWFile> file)
     : path_(std::move(path)),
-      writable_file_(std::move(writable_file)),
+      file_(std::move(file)),
       is_header_written_(false),
       is_footer_written_(false),
       written_offset_(0) {}
 
-Status WritableLogSegment::WriteHeaderAndOpen(const LogSegmentHeaderPB& new_header) {
+Status WritableLogSegment::WriteHeader(const LogSegmentHeaderPB& new_header) {
   MAYBE_FAULT(FLAGS_fault_crash_before_write_log_segment_header);
 
   DCHECK(!IsHeaderWritten()) << "Can only call WriteHeader() once";
@@ -776,7 +777,7 @@ Status WritableLogSegment::WriteHeaderAndOpen(const LogSegmentHeaderPB&
new_head
   PutFixed32(&buf, new_header.ByteSize());
   // Then Serialize the PB.
   pb_util::AppendToString(new_header, &buf);
-  RETURN_NOT_OK(writable_file_->Append(Slice(buf)));
+  RETURN_NOT_OK(file_->Write(0, Slice(buf)));
 
   header_.CopyFrom(new_header);
   first_entry_offset_ = buf.size();
@@ -786,9 +787,8 @@ Status WritableLogSegment::WriteHeaderAndOpen(const LogSegmentHeaderPB&
new_head
   return Status::OK();
 }
 
-Status WritableLogSegment::WriteFooterAndClose(const LogSegmentFooterPB& footer) {
-  TRACE_EVENT1("log", "WritableLogSegment::WriteFooterAndClose",
-               "path", path_);
+Status WritableLogSegment::WriteFooter(const LogSegmentFooterPB& footer) {
+  TRACE_EVENT1("log", "WritableLogSegment::WriteFooter", "path", path_);
   DCHECK(IsHeaderWritten());
   DCHECK(!IsFooterWritten());
   DCHECK(footer.IsInitialized()) << footer.InitializationErrorString();
@@ -798,14 +798,12 @@ Status WritableLogSegment::WriteFooterAndClose(const LogSegmentFooterPB&
footer)
   buf.append(kLogSegmentFooterMagicString);
   PutFixed32(&buf, footer.ByteSize());
 
-  RETURN_NOT_OK_PREPEND(writable_file_->Append(Slice(buf)), "Could not write the footer");
+  RETURN_NOT_OK_PREPEND(file_->Write(written_offset_, Slice(buf)),
+                        "Could not write the footer");
 
   footer_.CopyFrom(footer);
-  is_footer_written_ = true;
-
-  RETURN_NOT_OK(writable_file_->Close());
-
   written_offset_ += buf.size();
+  is_footer_written_ = true;
 
   return Status::OK();
 }
@@ -841,7 +839,7 @@ Status WritableLogSegment::WriteEntryBatch(const Slice& data,
   Slice slices[2] = {
     Slice(header_buf, arraysize(header_buf)),
     data_to_write };
-  RETURN_NOT_OK(writable_file_->AppendV(slices));
+  RETURN_NOT_OK(file_->WriteV(written_offset_, slices));
   written_offset_ += arraysize(header_buf) + data_to_write.size();
   return Status::OK();
 }
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index a858cce..27e8159 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -181,8 +181,7 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment>
{
                      scoped_refptr<ReadableLogSegment>* segment);
 
   // Build a readable segment to read entries from the provided path.
-  ReadableLogSegment(std::string path,
-                     std::shared_ptr<RandomAccessFile> readable_file);
+  ReadableLogSegment(std::string path, std::shared_ptr<RWFile> file);
 
   // Initialize the ReadableLogSegment.
   // This initializer provides methods for avoiding disk IO when creating a
@@ -225,7 +224,7 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment>
{
   }
 
   // Returns the parent directory where log segments are stored.
-  const std::string &path() const {
+  const std::string& path() const {
     return path_;
   }
 
@@ -253,22 +252,22 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment>
{
     return footer_;
   }
 
-  const std::shared_ptr<RandomAccessFile> readable_file() const {
-    return readable_file_;
+  std::shared_ptr<RWFile> file() const {
+    return file_;
   }
 
-  const int64_t file_size() const {
+  int64_t file_size() const {
     return file_size_.Load();
   }
 
-  const int64_t first_entry_offset() const {
+  int64_t first_entry_offset() const {
     return first_entry_offset_;
   }
 
   // Returns the full size of the file, if the segment is closed and has
   // a footer, or the offset where the last written, non corrupt entry
   // ends.
-  const int64_t readable_up_to() const;
+  int64_t readable_up_to() const;
 
   // Return the expected length of entry headers in this log segment.
   // Versions of Kudu older than 1.3 used a different log entry header format.
@@ -372,7 +371,7 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment>
{
 
   const std::string path_;
 
-  // The size of the readable file.
+  // The size of the file.
   // This is set by Init(). In the case of a log being written to,
   // this may be increased by UpdateReadableToOffset()
   AtomicInt<int64_t> file_size_;
@@ -385,8 +384,10 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment>
{
   // offset while an async reader is reading the segment's entries.
   AtomicInt<int64_t> readable_to_offset_;
 
-  // a readable file for a log segment (used on replay)
-  const std::shared_ptr<RandomAccessFile> readable_file_;
+  // File handle for a log segment (used on replay).
+  //
+  // Despite being read-write, we only ever use its read methods.
+  const std::shared_ptr<RWFile> file_;
 
   // Compression codec used to decompress entries in this file.
   const CompressionCodec* codec_;
@@ -412,22 +413,13 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment>
{
 class WritableLogSegment {
  public:
   WritableLogSegment(std::string path,
-                     std::shared_ptr<WritableFile> writable_file);
+                     std::shared_ptr<RWFile> file);
 
   // Opens the segment by writing the header.
-  Status WriteHeaderAndOpen(const LogSegmentHeaderPB& new_header);
-
-  // Closes the segment by writing the footer and then actually closing the
-  // underlying WritableFile.
-  Status WriteFooterAndClose(const LogSegmentFooterPB& footer);
+  Status WriteHeader(const LogSegmentHeaderPB& new_header);
 
-  bool IsClosed() {
-    return IsHeaderWritten() && IsFooterWritten();
-  }
-
-  int64_t Size() const {
-    return writable_file_->Size();
-  }
+  // Finishes the segment by writing the footer.
+  Status WriteFooter(const LogSegmentFooterPB& footer);
 
   // Appends the provided batch of data, including a header
   // and checksum. If 'codec' is not NULL, compresses the batch.
@@ -435,9 +427,9 @@ class WritableLogSegment {
   // Write a compressed entry to the log.
   Status WriteEntryBatch(const Slice& data, const CompressionCodec* codec);
 
-  // Makes sure the I/O buffers in the underlying writable file are flushed.
+  // Makes sure the I/O buffers belonging to the underlying file handle are flushed.
   Status Sync() {
-    return writable_file_->Sync();
+    return file_->Sync();
   }
 
   // Indicate that the segment has not been written for some period of time.
@@ -464,26 +456,30 @@ class WritableLogSegment {
   }
 
   // Returns the parent directory where log segments are stored.
-  const std::string &path() const {
+  const std::string& path() const {
     return path_;
   }
 
-  const int64_t first_entry_offset() const {
+  std::shared_ptr<RWFile> file() const {
+    return file_;
+  }
+
+  int64_t first_entry_offset() const {
     return first_entry_offset_;
   }
 
-  const int64_t written_offset() const {
+  int64_t written_offset() const {
     return written_offset_;
   }
 
  private:
   FRIEND_TEST(LogTest, TestAutoStopIdleAppendThread);
 
-  // The path to the log file.
+  // The path to the log segment.
   const std::string path_;
 
-  // The writable file to which this LogSegment will be written.
-  const std::shared_ptr<WritableFile> writable_file_;
+  // The file handle belonging to the log segment.
+  const std::shared_ptr<RWFile> file_;
 
   bool is_header_written_;
 
diff --git a/src/kudu/tserver/tablet_copy_service-test.cc b/src/kudu/tserver/tablet_copy_service-test.cc
index 25f076c..9f8afb5 100644
--- a/src/kudu/tserver/tablet_copy_service-test.cc
+++ b/src/kudu/tserver/tablet_copy_service-test.cc
@@ -485,7 +485,7 @@ TEST_F(TabletCopyServiceTest, TestFetchLog) {
   int64_t size = segment->file_size();
   scratch.resize(size);
   Slice slice(scratch.data(), size);
-  ASSERT_OK(segment->readable_file()->Read(0, slice));
+  ASSERT_OK(segment->file()->Read(0, slice));
 
   AssertDataEqual(slice.data(), slice.size(), resp.chunk());
 }
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc
index 6f38557..b919ce3 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -328,7 +328,7 @@ Status TabletCopySourceSession::GetLogSegmentPiece(uint64_t segment_seqno,
   DCHECK(init_once_.init_succeeded());
   RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(error_code),
                         "Tablet copy source could not get log segment");
-  ImmutableRandomAccessFileInfo* file_info;
+  ImmutableRWFileInfo* file_info;
   RETURN_NOT_OK(FindLogSegment(segment_seqno, &file_info, error_code));
   RETURN_NOT_OK(ReadFileChunkToBuf(file_info, offset, client_maxlen,
                                    Substitute("log segment $0", segment_seqno),
@@ -425,7 +425,7 @@ Status TabletCopySourceSession::OpenLogSegment(uint64_t segment_seqno)
{
   CHECK_EQ(log_segment->header().sequence_number(), segment_seqno);
 
   uint64_t size = log_segment->readable_up_to();
-  Status s = AddImmutableFileToMap(&logs_, segment_seqno, log_segment->readable_file(),
size);
+  Status s = AddImmutableFileToMap(&logs_, segment_seqno, log_segment->file(), size);
   if (!s.ok()) {
     s = s.CloneAndPrepend(
             Substitute("Error accessing data for log segment with seqno $0",
@@ -436,7 +436,7 @@ Status TabletCopySourceSession::OpenLogSegment(uint64_t segment_seqno)
{
 }
 
 Status TabletCopySourceSession::FindLogSegment(uint64_t segment_seqno,
-                                              ImmutableRandomAccessFileInfo** file_info,
+                                              ImmutableRWFileInfo** file_info,
                                               TabletCopyErrorPB::Code* error_code) {
   if (!FindCopy(logs_, segment_seqno, file_info)) {
     *error_code = TabletCopyErrorPB::WAL_SEGMENT_NOT_FOUND;
diff --git a/src/kudu/tserver/tablet_copy_source_session.h b/src/kudu/tserver/tablet_copy_source_session.h
index af15c45..b3e9ef3 100644
--- a/src/kudu/tserver/tablet_copy_source_session.h
+++ b/src/kudu/tserver/tablet_copy_source_session.h
@@ -59,14 +59,14 @@ struct TabletCopySourceMetrics {
   scoped_refptr<AtomicGauge<int32_t>> open_source_sessions;
 };
 
-// Caches file size and holds a shared_ptr reference to a RandomAccessFile.
-// Assumes that the file underlying the RandomAccessFile is immutable.
-struct ImmutableRandomAccessFileInfo {
-  std::shared_ptr<RandomAccessFile> readable;
+// Caches file size and holds a shared_ptr reference to a RWFile.
+// Assumes that the file underlying the RWFile is immutable.
+struct ImmutableRWFileInfo {
+  std::shared_ptr<RWFile> readable;
   int64_t size;
 
-  ImmutableRandomAccessFileInfo(std::shared_ptr<RandomAccessFile> readable,
-                                int64_t size)
+  ImmutableRWFileInfo(std::shared_ptr<RWFile> readable,
+                      int64_t size)
       : readable(std::move(readable)), size(size) {}
 
   Status Read(uint64_t offset, Slice data) const {
@@ -170,7 +170,7 @@ class TabletCopySourceSession : public RefCountedThreadSafe<TabletCopySourceSess
       ImmutableReadableBlockInfo*,
       BlockIdHash,
       BlockIdEqual> BlockMap;
-  typedef std::unordered_map<uint64_t, ImmutableRandomAccessFileInfo*> LogMap;
+  typedef std::unordered_map<uint64_t, ImmutableRWFileInfo*> LogMap;
 
   ~TabletCopySourceSession();
 
@@ -194,7 +194,7 @@ class TabletCopySourceSession : public RefCountedThreadSafe<TabletCopySourceSess
 
   // Look up log segment in cache or log segment map.
   Status FindLogSegment(uint64_t segment_seqno,
-                        ImmutableRandomAccessFileInfo** file_info,
+                        ImmutableRWFileInfo** file_info,
                         TabletCopyErrorPB::Code* error_code);
 
   // Unregister log anchor, if it's registered.


Mime
View raw message