kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [4/5] incubator-kudu git commit: log: use iterator interface for reading log while rebuilding footer
Date Mon, 16 May 2016 21:27:26 GMT
log: use iterator interface for reading log while rebuilding footer

We no longer read all of the entries in the segment into memory,
but rather process them one by one. This should improve performance
of this operation a little bit, but more importantly, it allows
us to remove some complexity from the ReadEntries() function.

Change-Id: Ic0547f43ba741a8f189fd0419319b17725d64f95
Reviewed-on: http://gerrit.cloudera.org:8080/3057
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/76c88a6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/76c88a6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/76c88a6f

Branch: refs/heads/master
Commit: 76c88a6fc12475638e60543b9f9aa7b3d1a9073e
Parents: d138dba
Author: Todd Lipcon <todd@apache.org>
Authored: Thu May 12 18:53:29 2016 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Mon May 16 21:26:51 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log.cc      | 10 +-----
 src/kudu/consensus/log.h       |  1 -
 src/kudu/consensus/log_util.cc | 69 ++++++++++++++++++-------------------
 src/kudu/consensus/log_util.h  | 11 +++---
 4 files changed, 40 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76c88a6f/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index ef68df8..7987304 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -572,15 +572,7 @@ void Log::UpdateFooterForBatch(LogEntryBatch* batch) {
   if (batch->type_ == REPLICATE) {
     // Update the index bounds for the current segment.
     for (const LogEntryPB& entry_pb : batch->entry_batch_pb_->entry()) {
-      int64_t index = entry_pb.replicate().id().index();
-      if (!footer_builder_.has_min_replicate_index() ||
-          index < footer_builder_.min_replicate_index()) {
-        footer_builder_.set_min_replicate_index(index);
-      }
-      if (!footer_builder_.has_max_replicate_index() ||
-          index > footer_builder_.max_replicate_index()) {
-        footer_builder_.set_max_replicate_index(index);
-      }
+      UpdateFooterForReplicateEntry(entry_pb, &footer_builder_);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76c88a6f/src/kudu/consensus/log.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index b823648..766dd6f 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -228,7 +228,6 @@ class Log : public RefCountedThreadSafe<Log> {
   //
   // This method is thread-safe.
   void SetSchemaForNextLogSegment(const Schema& schema, uint32_t version);
-
  private:
   friend class LogTest;
   friend class LogTestBase;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76c88a6f/src/kudu/consensus/log_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 65da872..2c29921 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -333,37 +333,31 @@ Status ReadableLogSegment::RebuildFooterByScanning() {
                "path", path_);
 
   DCHECK(!footer_.IsInitialized());
-  vector<LogEntryPB*> entries;
-  ElementDeleter deleter(&entries);
-  int64_t end_offset = 0;
-  RETURN_NOT_OK(ReadEntries(&entries, &end_offset));
-
-  footer_.set_num_entries(entries.size());
-
-  // Rebuild the min/max replicate index (by scanning)
-  for (const LogEntryPB* entry : entries) {
-    if (entry->has_replicate()) {
-      int64_t index = entry->replicate().id().index();
-      // TODO: common code with Log::UpdateFooterForBatch
-      if (!footer_.has_min_replicate_index() ||
-          index < footer_.min_replicate_index()) {
-        footer_.set_min_replicate_index(index);
-      }
-      if (!footer_.has_max_replicate_index() ||
-          index > footer_.max_replicate_index()) {
-        footer_.set_max_replicate_index(index);
-      }
+
+  LogEntryReader reader(this);
+
+  LogSegmentFooterPB new_footer;
+  int num_entries = 0;
+  LogEntryPB entry;
+  while (true) {
+    Status s = reader.ReadNextEntry(&entry);
+    if (s.IsEndOfFile()) break;
+    RETURN_NOT_OK(s);
+
+    if (entry.has_replicate()) {
+      UpdateFooterForReplicateEntry(entry, &new_footer);
     }
+    num_entries++;
   }
 
+  new_footer.set_num_entries(num_entries);
+  footer_ = new_footer;
   DCHECK(footer_.IsInitialized());
-  DCHECK_EQ(entries.size(), footer_.num_entries());
   footer_was_rebuilt_ = true;
-
-  readable_to_offset_.Store(end_offset);
+  readable_to_offset_.Store(reader.offset());
 
   LOG(INFO) << "Successfully rebuilt footer for segment: " << path_
-            << " (valid entries through byte offset " << end_offset <<
")";
+            << " (valid entries through byte offset " << reader.offset() <<
")";
   return Status::OK();
 }
 
@@ -553,22 +547,11 @@ Status ReadableLogSegment::ParseFooterMagicAndFooterLength(const Slice
&data,
   return Status::OK();
 }
 
-Status ReadableLogSegment::ReadEntries(vector<LogEntryPB*>* entries,
-                                       int64_t* end_offset) {
+Status ReadableLogSegment::ReadEntries(vector<LogEntryPB*>* entries) {
   TRACE_EVENT1("log", "ReadableLogSegment::ReadEntries",
                "path", path_);
   LogEntryReader reader(this);
 
-  // On any exit from this function, callers expect 'end_offset' to be
-  // set to the offset of the last successfully read operation.
-  // TODO: once RebuildFooterByScanning() is converted to use the iterator
-  // directly, we can remove the 'end_offset' argument.
-  auto set_end_offset = MakeScopedCleanup([&] {
-      if (end_offset != nullptr) {
-        *end_offset = reader.offset();
-      }
-    });
-
   while (true) {
     unique_ptr<LogEntryPB> entry(new LogEntryPB());
     Status s = reader.ReadNextEntry(entry.get());
@@ -839,5 +822,19 @@ bool IsLogFileName(const string& fname) {
   return true;
 }
 
+void UpdateFooterForReplicateEntry(const LogEntryPB& entry_pb,
+                                   LogSegmentFooterPB* footer) {
+  DCHECK(entry_pb.has_replicate());
+  int64_t index = entry_pb.replicate().id().index();
+  if (!footer->has_min_replicate_index() ||
+      index < footer->min_replicate_index()) {
+    footer->set_min_replicate_index(index);
+  }
+  if (!footer->has_max_replicate_index() ||
+      index > footer->max_replicate_index()) {
+    footer->set_max_replicate_index(index);
+  }
+}
+
 }  // namespace log
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76c88a6f/src/kudu/consensus/log_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index 457c4b5..a4b2bae 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -187,11 +187,7 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment>
{
   // If the log is corrupted (i.e. the returned 'Status' is 'Corruption') all
   // the log entries read up to the corrupted one are returned in the 'entries'
   // vector.
-  //
-  // If 'end_offset' is not NULL, then returns the file offset following the last
-  // successfully read entry.
-  Status ReadEntries(std::vector<LogEntryPB*>* entries,
-                     int64_t* end_offset = NULL);
+  Status ReadEntries(std::vector<LogEntryPB*>* entries);
 
   // Rebuilds this segment's footer by scanning its entries.
   // This is an expensive operation as it reads and parses the whole segment
@@ -454,6 +450,11 @@ void CreateBatchFromAllocatedOperations(const std::vector<consensus::ReplicateRe
 // Checks if 'fname' is a correctly formatted name of log segment file.
 bool IsLogFileName(const std::string& fname);
 
+// Update 'footer' to reflect the given REPLICATE message 'entry_pb'.
+// In particular, updates the min/max seen replicate OpID.
+void UpdateFooterForReplicateEntry(
+    const LogEntryPB& entry_pb, LogSegmentFooterPB* footer);
+
 }  // namespace log
 }  // namespace kudu
 


Mime
View raw message