kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject incubator-kudu git commit: log: Mark allocation finished even if allocation had an error
Date Wed, 22 Jun 2016 21:27:54 GMT
Repository: incubator-kudu
Updated Branches:
  refs/heads/master c2d68da78 -> 7d5788225


log: Mark allocation finished even if allocation had an error

This fixes two bugs:

1. If a disk preallocation fails we will enter a "stuck" state where we
   cannot preallocate a new segment. Errors to allocate or append should
   be propagated up.

2. If anything fails in DoAppend() we will attempt to delete
   ReplicateMsg member objects in LogEntryPB twice, resulting in a
   segfault.

Added a test that crashes without the code changes in log.cc

Change-Id: If22bf946a42d0ec32c35164acd9e6e6cef18dcc3
Reviewed-on: http://gerrit.cloudera.org:8080/3234
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/7d578822
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/7d578822
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/7d578822

Branch: refs/heads/master
Commit: 7d5788225e4fc21bc376e7732502e8789aad698f
Parents: c2d68da
Author: Mike Percy <mpercy@apache.org>
Authored: Sun Jun 19 22:02:42 2016 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Wed Jun 22 21:26:02 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_peers.cc |  1 -
 src/kudu/consensus/log-test-base.h    |  3 +--
 src/kudu/consensus/log-test.cc        | 15 +++++++++++
 src/kudu/consensus/log.cc             | 42 ++++++++++++++++++------------
 src/kudu/consensus/log.h              | 10 ++++---
 src/kudu/consensus/log_reader.cc      |  2 +-
 src/kudu/consensus/log_reader.h       |  2 +-
 src/kudu/consensus/raft_consensus.cc  |  1 -
 8 files changed, 50 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7d578822/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index 5220e53..0254237 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -68,7 +68,6 @@ namespace kudu {
 namespace consensus {
 
 using log::Log;
-using log::LogEntryBatch;
 using std::shared_ptr;
 using rpc::Messenger;
 using rpc::RpcController;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7d578822/src/kudu/consensus/log-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test-base.h b/src/kudu/consensus/log-test-base.h
index 1f2c192..05223ba 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -111,8 +111,7 @@ static Status AppendNoOpsToLogSync(const scoped_refptr<Clock>&
clock,
   Synchronizer s;
   RETURN_NOT_OK(log->AsyncAppendReplicates(replicates,
                                            s.AsStatusCallback()));
-  s.Wait();
-  return Status::OK();
+  return s.Wait();
 }
 
 static Status AppendNoOpToLogSync(const scoped_refptr<Clock>& clock,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7d578822/src/kudu/consensus/log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index 03fa8ec..c9fc41b 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -34,6 +34,7 @@ DEFINE_int32(num_batches, 10000,
              "Number of batches to write to/read from the Log in TestWriteManyBatches");
 
 DECLARE_int32(log_min_segments_to_retain);
+DECLARE_double(log_inject_io_error_on_preallocate_fraction);
 
 namespace kudu {
 namespace log {
@@ -1048,5 +1049,19 @@ TEST_F(LogTest, TestGetMaxIndexesToSegmentSizeMap) {
   ASSERT_EQ(0, max_idx_to_segment_size.size());
 }
 
+// Regression test. Check that failed preallocation returns an error instead of
+// hanging.
+TEST_F(LogTest, TestFailedLogPreAllocation) {
+  options_.async_preallocate_segments = false;
+  ASSERT_OK(BuildLog());
+
+  log_->SetMaxSegmentSizeForTests(1);
+  FLAGS_log_inject_io_error_on_preallocate_fraction = 1.0;
+  OpId opid = MakeOpId(1, 1);
+  Status s = AppendNoOp(&opid);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Injected IOError");
+}
+
 } // namespace log
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7d578822/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 3e10523..b577339 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -44,6 +44,7 @@
 #include "kudu/util/path_util.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/thread.h"
 #include "kudu/util/threadpool.h"
@@ -101,6 +102,12 @@ DEFINE_double(log_inject_io_error_on_append_fraction, 0.0,
 TAG_FLAG(log_inject_io_error_on_append_fraction, unsafe);
 TAG_FLAG(log_inject_io_error_on_append_fraction, runtime);
 
+DEFINE_double(log_inject_io_error_on_preallocate_fraction, 0.0,
+              "Fraction of the time when the log will fail to preallocate and return an IOError.
"
+              "(For testing only!)");
+TAG_FLAG(log_inject_io_error_on_preallocate_fraction, unsafe);
+TAG_FLAG(log_inject_io_error_on_preallocate_fraction, runtime);
+
 // Validate that log_min_segments_to_retain >= 1
 static bool ValidateLogsToRetain(const char* flagname, int value) {
   if (value >= 1) {
@@ -473,7 +480,7 @@ Status Log::AsyncAppendCommit(gscoped_ptr<consensus::CommitMsg>
commit_msg,
   return Status::OK();
 }
 
-Status Log::DoAppend(LogEntryBatch* entry_batch, bool caller_owns_operation) {
+Status Log::DoAppend(LogEntryBatch* entry_batch) {
   size_t num_entries = entry_batch->count();
   DCHECK_GT(num_entries, 0) << "Cannot call DoAppend() with zero entries reserved";
 
@@ -541,15 +548,6 @@ Status Log::DoAppend(LogEntryBatch* entry_batch, bool caller_owns_operation)
{
   CHECK_OK(UpdateIndexForBatch(*entry_batch, start_offset));
   UpdateFooterForBatch(entry_batch);
 
-  // For REPLICATE batches, we expect the caller to free the actual entries if
-  // caller_owns_operation is set.
-  if (entry_batch->type_ == REPLICATE && caller_owns_operation) {
-    for (int i = 0; i < entry_batch->entry_batch_pb_->entry_size(); i++) {
-      LogEntryPB* entry_pb = entry_batch->entry_batch_pb_->mutable_entry(i);
-      entry_pb->release_replicate();
-    }
-  }
-
   return Status::OK();
 }
 
@@ -677,7 +675,7 @@ Status Log::Append(LogEntryPB* phys_entry) {
   Status s = entry_batch.Serialize();
   if (s.ok()) {
     entry_batch.state_ = LogEntryBatch::kEntryReady;
-    s = DoAppend(&entry_batch, false);
+    s = DoAppend(&entry_batch);
     if (s.ok()) {
       s = Sync();
     }
@@ -824,7 +822,7 @@ Status Log::Close() {
       return Status::OK();
 
     default:
-      return Status::IllegalState(Substitute("Bad state for Close() $0", log_state_));
+      return Status::IllegalState(Substitute("Log not open. State: $0", log_state_));
   }
 }
 
@@ -843,10 +841,19 @@ Status Log::PreAllocateNewSegment() {
   TRACE_EVENT1("log", "PreAllocateNewSegment", "file", next_segment_path_);
   CHECK_EQ(allocation_state(), kAllocationInProgress);
 
+  // We must mark allocation as finished when returning from this method.
+  auto alloc_finished = MakeScopedCleanup([&] () {
+    std::lock_guard<boost::shared_mutex> l(allocation_lock_);
+    allocation_state_ = kAllocationFinished;
+  });
+
   WritableFileOptions opts;
   opts.sync_on_close = force_sync_all_;
   RETURN_NOT_OK(CreatePlaceholderSegment(opts, &next_segment_path_, &next_segment_file_));
 
+  MAYBE_RETURN_FAILURE(FLAGS_log_inject_io_error_on_preallocate_fraction,
+                       Status::IOError("Injected IOError in Log::PreAllocateNewSegment()"));
+
   if (options_.preallocate_segments) {
     TRACE("Preallocating $0 byte segment in $1", max_segment_size_, next_segment_path_);
     // TODO (perf) zero the new segments -- this could result in
@@ -854,10 +861,6 @@ Status Log::PreAllocateNewSegment() {
     RETURN_NOT_OK(next_segment_file_->PreAllocate(max_segment_size_));
   }
 
-  {
-    std::lock_guard<boost::shared_mutex> lock_guard(allocation_lock_);
-    allocation_state_ = kAllocationFinished;
-  }
   return Status::OK();
 }
 
@@ -976,6 +979,13 @@ LogEntryBatch::LogEntryBatch(LogEntryTypePB type,
 }
 
 LogEntryBatch::~LogEntryBatch() {
+  if (type_ == REPLICATE && entry_batch_pb_) {
+    for (LogEntryPB& entry : *entry_batch_pb_->mutable_entry()) {
+      // ReplicateMsg elements are owned by and must be freed by the caller
+      // (e.g. the LogCache).
+      entry.release_replicate();
+    }
+  }
 }
 
 void LogEntryBatch::MarkReserved() {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7d578822/src/kudu/consensus/log.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index db1a3da..331b9bf 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -117,7 +117,6 @@ class Log : public RefCountedThreadSafe<Log> {
 
   // Synchronously append a new entry to the log.
   // Log does not take ownership of the passed 'entry'.
-  // TODO get rid of this method, transition to the asynchronous API
   Status Append(LogEntryPB* entry);
 
   // Append the given set of replicate messages, asynchronously.
@@ -283,9 +282,7 @@ class Log : public RefCountedThreadSafe<Log> {
   // AppenderThread. If 'caller_owns_operation' is true, then the
   // 'operation' field of the entry will be released after the entry
   // is appended.
-  // TODO once Append() is removed, 'caller_owns_operation' and
-  // associated logic will no longer be needed.
-  Status DoAppend(LogEntryBatch* entry, bool caller_owns_operation = true);
+  Status DoAppend(LogEntryBatch* entry);
 
   // Update footer_builder_ to reflect the log indexes seen in 'batch'.
   void UpdateFooterForBatch(LogEntryBatch* batch);
@@ -404,8 +401,13 @@ class Log : public RefCountedThreadSafe<Log> {
 // This class represents a batch of operations to be written and
 // synced to the log. It is opaque to the user and is managed by the
 // Log class.
+//
 // A single batch must have only one type of entries in it (eg only
 // REPLICATEs or only COMMITs).
+//
+// The ReplicateMsg sub-elements of each LogEntryPB within the LogEntryBatchPB
+// 'entry_batch_pb_' are not owned by the LogEntryPBs, and at LogEntryBatch
+// destruction time they are released.
 class LogEntryBatch {
  public:
   ~LogEntryBatch();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7d578822/src/kudu/consensus/log_reader.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_reader.cc b/src/kudu/consensus/log_reader.cc
index c1ee1eb..39a8c2d 100644
--- a/src/kudu/consensus/log_reader.cc
+++ b/src/kudu/consensus/log_reader.cc
@@ -330,7 +330,7 @@ Status LogReader::ReadReplicatesInRange(const int64_t starting_at,
     RETURN_NOT_OK_PREPEND(log_index_->GetEntry(index, &index_entry),
                           Substitute("Failed to read log index for op $0", index));
 
-    // Since a given LogEntryBatch may contain multiple REPLICATE messages,
+    // Since a given LogEntryBatchPB may contain multiple REPLICATE messages,
     // it's likely that this index entry points to the same batch as the previous
     // one. If that's the case, we've already read this REPLICATE and we can
     // skip reading the batch again.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7d578822/src/kudu/consensus/log_reader.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_reader.h b/src/kudu/consensus/log_reader.h
index 2cf069a..d17f041 100644
--- a/src/kudu/consensus/log_reader.h
+++ b/src/kudu/consensus/log_reader.h
@@ -167,7 +167,7 @@ class LogReader {
   // written to.
   void UpdateLastSegmentOffset(int64_t readable_to_offset);
 
-  // Read the LogEntryBatch pointed to by the provided index entry.
+  // Read the LogEntryBatchPB pointed to by the provided index entry.
   // 'tmp_buf' is used as scratch space to avoid extra allocation.
   Status ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
                                   faststring* tmp_buf,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7d578822/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index ebe1938..761126e 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -141,7 +141,6 @@ int GetFailureMonitorCheckStddevMs() {
 namespace kudu {
 namespace consensus {
 
-using log::LogEntryBatch;
 using std::shared_ptr;
 using strings::Substitute;
 using tserver::TabletServerErrorPB;


Mime
View raw message