kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject [1/6] kudu git commit: KUDU-1793: only update lbm container bookkeeping on success
Date Thu, 08 Dec 2016 16:14:33 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 1b3eeb3c7 -> 839bd6f9a


KUDU-1793: only update lbm container bookkeeping on success

This patch reverts part of commit abea8c6 by moving some bookkeeping out of
containers and into writable blocks, ensuring that the container's
accounting structures are only modified when a block has been written
successfully.

Additionally, since the aforementioned bug shipped in Kudu 1.1.0, the LBM
invariant of "all blocks are contiguous on disk" has been relaxed somewhat:
if we encounter an unexpected "hole", we'll place the next block after it.

The new test relies on Env fault injection to simulate out-of-disk-space
errors. It triggered a crash 100% of the time without the fix.

I also snuck in a change to relax the behavior of the LBM at startup if it
encounters an unknown record type (for future-proofing), and changed
several LOG(FATAL) callers to propagate a bad Status up the stack instead.

Change-Id: I49bc98c9f8b7dce0333f88cec85757fe122acfa4
Reviewed-on: http://gerrit.cloudera.org:8080/5399
Tested-by: Adar Dembo <adar@cloudera.com>
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/faa587c6
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/faa587c6
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/faa587c6

Branch: refs/heads/master
Commit: faa587c639aa9e5dcf3fac04259f46ba1921140a
Parents: 1b3eeb3
Author: Adar Dembo <adar@cloudera.com>
Authored: Wed Dec 7 02:52:32 2016 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Thu Dec 8 04:07:39 2016 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager-test.cc | 107 ++++++++++++++++++++++
 src/kudu/fs/log_block_manager.cc  | 161 +++++++++++++++++++++------------
 src/kudu/fs/log_block_manager.h   |  10 +-
 src/kudu/master/sys_catalog.cc    |   2 +-
 src/kudu/util/env.h               |   6 ++
 src/kudu/util/env_posix.cc        |  19 ++++
 6 files changed, 242 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/faa587c6/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 2e19afd..74ff728 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -16,6 +16,9 @@
 // under the License.
 
 #include <memory>
+#include <unordered_set>
+#include <string>
+#include <vector>
 
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs.pb.h"
@@ -39,9 +42,12 @@ using kudu::pb_util::ReadablePBContainerFile;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
+DECLARE_bool(never_fsync);
+
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
 
@@ -49,8 +55,11 @@ DECLARE_int64(fs_data_dirs_reserved_bytes);
 DECLARE_int64(disk_reserved_bytes_free_for_testing);
 
 DECLARE_int32(fs_data_dirs_full_disk_cache_seconds);
+
 DECLARE_string(block_manager);
 
+DECLARE_double(env_inject_io_error_on_write_or_preallocate);
+
 // Generic block manager metrics.
 METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_reading);
 METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_writing);
@@ -1126,5 +1135,103 @@ TYPED_TEST(BlockManagerTest, TestDiskSpaceCheck) {
   }
 }
 
+// Regression test for KUDU-1793.
+TYPED_TEST(BlockManagerTest, TestMetadataOkayDespiteFailedWrites) {
+  const int kNumTries = 3;
+  const int kNumBlockTries = 1000;
+  const int kNumAppends = 4;
+  const string kTestData = "asdf";
+
+  // Speed up the test.
+  FLAGS_never_fsync = true;
+
+  // Since we're appending so little data, reconfigure these to ensure quite a
+  // few containers and a good amount of preallocating.
+  FLAGS_log_container_max_size = 256 * 1024;
+  FLAGS_log_container_preallocate_bytes = 8 * 1024;
+
+  // Force some file operations to fail.
+  FLAGS_env_inject_io_error_on_write_or_preallocate = 0.2;
+
+  // Creates a block, writing the result to 'out' on success.
+  auto create_a_block = [&](BlockId* out) -> Status {
+    gscoped_ptr<WritableBlock> block;
+    RETURN_NOT_OK(this->bm_->CreateBlock(&block));
+    for (int i = 0; i < kNumAppends; i++) {
+      RETURN_NOT_OK(block->Append(kTestData));
+    }
+    RETURN_NOT_OK(block->Close());
+    *out = block->id();
+    return Status::OK();
+  };
+
+  // Reads a block given by 'id', comparing its contents to kTestData.
+  auto read_a_block = [&](const BlockId& id) -> Status {
+    gscoped_ptr<ReadableBlock> block;
+    RETURN_NOT_OK(this->bm_->OpenBlock(id, &block));
+    uint64_t size;
+    RETURN_NOT_OK(block->Size(&size));
+    CHECK_EQ(kNumAppends * kTestData.size(), size);
+
+    for (int i = 0; i < kNumAppends; i++) {
+      uint8_t buf[kTestData.size()];
+      Slice s;
+      RETURN_NOT_OK(block->Read(i * kNumAppends, sizeof(buf), &s, buf));
+      CHECK_EQ(kTestData, s);
+    }
+    return Status::OK();
+  };
+
+  // For each iteration:
+  // 1. Try to create kNumTries new blocks.
+  // 2. Try to delete every other block.
+  // 3. Read and test every block.
+  // 4. Restart the block manager, forcing the on-disk metadata to be reloaded.
+  unordered_set<BlockId, BlockIdHash> ids;
+  for (int attempt = 0; attempt < kNumTries; attempt++) {
+    int num_created = 0;
+    for (int i = 0; i < kNumBlockTries; i++) {
+      BlockId id;
+      Status s = create_a_block(&id);
+      if (s.ok()) {
+        InsertOrDie(&ids, id);
+        num_created++;
+      }
+    }
+    LOG(INFO) << Substitute("Successfully created $0 blocks on $1 attempts",
+                            num_created, kNumBlockTries);
+
+    int num_deleted = 0;
+    int num_deleted_attempts = 0;
+    for (auto it = ids.begin(); it != ids.end();) {
+      // TODO(adar): the lbm removes a block from its block map even if the
+      // on-disk deletion fails. When that's fixed, update this code to
+      // erase() only if s.ok().
+      Status s = this->bm_->DeleteBlock(*it);
+      it = ids.erase(it);
+      if (s.ok()) {
+        num_deleted++;
+      }
+      num_deleted_attempts++;
+
+      // Skip every other block.
+      if (it != ids.end()) {
+        it++;
+      }
+    }
+    LOG(INFO) << Substitute("Successfully deleted $0 blocks on $1 attempts",
+                            num_deleted, num_deleted_attempts);
+
+    for (const auto& id : ids) {
+      ASSERT_OK(read_a_block(id));
+    }
+
+    ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                       shared_ptr<MemTracker>(),
+                                       { GetTestDataDirectory() },
+                                       false));
+  }
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/faa587c6/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 7877cc4..129d9cc 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -196,10 +196,19 @@ class LogBlockContainer {
   // The on-disk effects of this call are made durable only after SyncData().
   Status DeleteBlock(int64_t offset, int64_t length);
 
-  // Appends 'data' to this container's data file.
+  // Preallocate enough space to ensure that an append of 'next_append_length'
+  // can be satisfied by this container. The offset of the beginning of this
+  // block must be provided in 'block_start_offset' (since container
+  // bookkeeping is only updated when a block is finished).
+  //
+  // Does nothing if preallocation is disabled.
+  Status EnsurePreallocated(int64_t block_start_offset,
+                            size_t next_append_length);
+
+  // Writes 'data' to this container's data file at offset 'offset'.
   //
   // The on-disk effects of this call are made durable only after SyncData().
-  Status AppendData(const Slice& data);
+  Status WriteData(int64_t offset, const Slice& data);
 
   // See RWFile::Read().
   Status ReadData(int64_t offset, size_t length,
@@ -254,7 +263,7 @@ class LogBlockContainer {
   // will round up the container data file's position.
   //
   // This function is thread unsafe.
-  void UpdateBytesWritten(int64_t more_bytes);
+  void UpdateBytesWritten(int64_t block_offset, size_t block_length);
 
   // Run a task on this container's data directory thread pool.
   //
@@ -282,14 +291,9 @@ class LogBlockContainer {
                     shared_ptr<RWFile> data_file);
 
   // Performs sanity checks on a block record.
-  void CheckBlockRecord(const BlockRecordPB& record,
-                        uint64_t data_file_size) const;
-
-  // Preallocate enough space to ensure that an append of 'next_append_bytes'
-  // can be satisfied by this container.
-  //
-  // Does nothing if preallocation is disabled.
-  Status EnsurePreallocated(size_t next_append_bytes);
+  Status CheckBlockRecord(const BlockRecordPB& record,
+                          uint64_t data_file_size,
+                          uint64_t fs_block_size) const;
 
   // The owning block manager. Must outlive the container itself.
   LogBlockManager* const block_manager_;
@@ -506,6 +510,8 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>*
records) co
 
   uint64_t data_file_size;
   RETURN_NOT_OK(data_file_->Size(&data_file_size));
+  uint64_t fs_block_size =
+      data_dir_->instance()->metadata()->filesystem_block_size_bytes();
   deque<BlockRecordPB> local_records;
   Status read_status;
   while (true) {
@@ -526,7 +532,7 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>*
records) co
       // that additional data has been written to the file.
       RETURN_NOT_OK(data_file_->Size(&data_file_size));
     }
-    CheckBlockRecord(record, data_file_size);
+    RETURN_NOT_OK(CheckBlockRecord(record, data_file_size, fs_block_size));
   }
   // NOTE: 'read_status' will never be OK here.
   if (PREDICT_TRUE(read_status.IsEndOfFile())) {
@@ -564,18 +570,37 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>*
records) co
   return read_status;
 }
 
-void LogBlockContainer::CheckBlockRecord(const BlockRecordPB& record,
-                                         uint64_t data_file_size) const {
-  if (record.op_type() == CREATE &&
-      (!record.has_offset()  ||
-       !record.has_length()  ||
+Status LogBlockContainer::CheckBlockRecord(const BlockRecordPB& record,
+                                           uint64_t data_file_size,
+                                           uint64_t fs_block_size) const {
+  if (record.op_type() == CREATE) {
+    if (!record.has_offset() ||
+        !record.has_length() ||
         record.offset() < 0  ||
-        record.length() < 0  ||
-        record.offset() + record.length() > data_file_size)) {
-    LOG(FATAL) << "Found malformed block record in data file: " << data_file_->filename()
-               << "\nRecord: " << record.DebugString()
-               << "\nData file size: " << data_file_size;
+        record.length() < 0) {
+      return Status::Corruption(Substitute(
+          "Found malformed block record in data file: $0\nRecord: $1\n",
+          data_file_->filename(), record.DebugString()));
+    }
+    if (record.offset() + record.length() > data_file_size) {
+      return Status::Corruption(Substitute(
+          "Found block extending beyond the end of data file: $0\n"
+          "Record: $1\nData file size: $2",
+          data_file_->filename(), record.DebugString(), data_file_size));
+    }
+
+    // We could also check that the record's offset is aligned with the
+    // underlying filesystem's block size, an invariant maintained by the log
+    // block manager. However, due to KUDU-1793, that invariant may have been
+    // broken, so we'll LOG but otherwise allow it.
+    if (record.offset() % fs_block_size != 0) {
+      LOG(WARNING) << Substitute(
+          "Found misaligned block in data file: $0\nRecord: $1\n"
+          "This is likely because of KUDU-1793",
+          data_file_->filename(), record.DebugString());
+    }
   }
+  return Status::OK();
 }
 
 Status LogBlockContainer::FinishBlock(const Status& s, WritableBlock* block) {
@@ -596,13 +621,10 @@ Status LogBlockContainer::FinishBlock(const Status& s, WritableBlock*
block) {
   // will have written some garbage that can be expunged during a GC.
   RETURN_NOT_OK(block_manager()->SyncContainer(*this));
 
-  // Each call to AppendData() updated 'total_bytes_written_' to reflect the
-  // new block. Nevertheless, we must call UpdateBytesWritten() whenever a
-  // block is finished in order to prepare for the next block.
   CHECK(block_manager()->AddLogBlock(this, block->id(),
-                                     total_bytes_written_ - block->BytesAppended(),
+                                     total_bytes_written_,
                                      block->BytesAppended()));
-  UpdateBytesWritten(0);
+  UpdateBytesWritten(total_bytes_written_, block->BytesAppended());
 
   // Truncate the container if it's now full; any left over preallocated space
   // is no longer needed.
@@ -638,15 +660,15 @@ Status LogBlockContainer::DeleteBlock(int64_t offset, int64_t length)
{
   return Status::OK();
 }
 
-Status LogBlockContainer::AppendData(const Slice& data) {
-  RETURN_NOT_OK(EnsurePreallocated(data.size()));
-  RETURN_NOT_OK(data_file_->Write(total_bytes_written_, data));
-  total_bytes_written_ += data.size();
+Status LogBlockContainer::WriteData(int64_t offset, const Slice& data) {
+  DCHECK_GE(offset, total_bytes_written_);
+
+  RETURN_NOT_OK(data_file_->Write(offset, data));
 
   // This append may have changed the container size if:
   // 1. It was large enough that it blew out the preallocated space.
   // 2. Preallocation was disabled.
-  if (total_bytes_written_ > preallocated_offset_) {
+  if (offset + data.size() > preallocated_offset_) {
     RETURN_NOT_OK(data_dir_->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
   }
   return Status::OK();
@@ -689,16 +711,19 @@ Status LogBlockContainer::SyncMetadata() {
   return Status::OK();
 }
 
-Status LogBlockContainer::EnsurePreallocated(size_t next_append_bytes) {
+Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset,
+                                             size_t next_append_length) {
+  DCHECK_GE(block_start_offset, 0);
+
   if (!FLAGS_log_container_preallocate_bytes) {
     return Status::OK();
   }
 
   // If the last write blew out the preallocation window, or if the next write
   // exceeds it, we need to preallocate another chunk.
-  if (total_bytes_written_ > preallocated_offset_ ||
-      next_append_bytes > preallocated_offset_ - total_bytes_written_) {
-    int64_t off = std::max(preallocated_offset_, total_bytes_written_);
+  if (block_start_offset > preallocated_offset_ ||
+      next_append_length > preallocated_offset_ - block_start_offset) {
+    int64_t off = std::max(preallocated_offset_, block_start_offset);
     int64_t len = FLAGS_log_container_preallocate_bytes;
     RETURN_NOT_OK(data_file_->PreAllocate(off, len));
     RETURN_NOT_OK(data_dir_->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
@@ -711,17 +736,28 @@ Status LogBlockContainer::EnsurePreallocated(size_t next_append_bytes)
{
   return Status::OK();
 }
 
-void LogBlockContainer::UpdateBytesWritten(int64_t more_bytes) {
-  DCHECK_GE(more_bytes, 0);
-
-  total_bytes_written_ += more_bytes;
+void LogBlockContainer::UpdateBytesWritten(int64_t block_offset, size_t block_length) {
+  DCHECK_GE(block_offset, 0);
 
+  // The log block manager maintains block contiguity as an invariant, which
+  // means accounting for the new block should be as simple as adding its
+  // length to 'total_bytes_written_'. However, due to KUDU-1793, some
+  // containers may have developed extra "holes" between blocks. We'll account
+  // for that by considering both the block's offset and its length.
+  //
   // The number of bytes is rounded up to the nearest filesystem block so
   // that each Kudu block is guaranteed to be on a filesystem block
   // boundary. This guarantees that the disk space can be reclaimed when
   // the block is deleted.
-  total_bytes_written_ = KUDU_ALIGN_UP(total_bytes_written_,
-                                       instance()->filesystem_block_size_bytes());
+  int64_t new_total_bytes = KUDU_ALIGN_UP(
+      block_offset + block_length, instance()->filesystem_block_size_bytes());
+  if (new_total_bytes < total_bytes_written_) {
+    LOG(WARNING) << Substitute(
+        "Container $0 unexpectedly tried to lower its size (from $1 to $2 "
+        "bytes), ignoring", ToString(), total_bytes_written_, new_total_bytes);
+  }
+  total_bytes_written_ = std::max(total_bytes_written_, new_total_bytes);
+
   if (full()) {
     VLOG(1) << Substitute(
         "Container $0 with size $1 is now full, max size is $2",
@@ -941,8 +977,11 @@ Status LogWritableBlock::Append(const Slice& data) {
   // whichever comes first. We can't do it now because the block's
   // length is still in flux.
 
+  int64_t cur_block_offset = block_offset_ + block_length_;
+  RETURN_NOT_OK(container_->EnsurePreallocated(cur_block_offset, data.size()));
+
   MicrosecondsInt64 start_time = GetMonoTimeMicros();
-  RETURN_NOT_OK(container_->AppendData(data));
+  RETURN_NOT_OK(container_->WriteData(cur_block_offset, data));
   MicrosecondsInt64 end_time = GetMonoTimeMicros();
 
   int64_t dur = end_time - start_time;
@@ -1561,7 +1600,12 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     UntrackedBlockMap blocks_in_container;
     uint64_t max_block_id = 0;
     for (const BlockRecordPB& r : records) {
-      ProcessBlockRecord(r, container.get(), &blocks_in_container);
+      s = ProcessBlockRecord(r, container.get(), &blocks_in_container);
+      if (!s.ok()) {
+        *result_status = s.CloneAndPrepend(Substitute(
+            "Could not process record in container $0", container->ToString()));
+        return;
+      }
       max_block_id = std::max(max_block_id, r.block_id().id());
     }
 
@@ -1606,19 +1650,18 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   *result_status = Status::OK();
 }
 
-void LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
-                                         LogBlockContainer* container,
-                                         UntrackedBlockMap* block_map) {
+Status LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
+                                           LogBlockContainer* container,
+                                           UntrackedBlockMap* block_map) {
   BlockId block_id(BlockId::FromPB(record.block_id()));
   switch (record.op_type()) {
     case CREATE: {
       scoped_refptr<LogBlock> lb(new LogBlock(container, block_id,
                                               record.offset(), record.length()));
       if (!InsertIfNotPresent(block_map, block_id, lb)) {
-        LOG(FATAL) << "Found duplicate CREATE record for block "
-                   << block_id.ToString() << " in container "
-                   << container->ToString() << ": "
-                   << record.DebugString();
+        return Status::Corruption(Substitute(
+            "found duplicate CREATE record for block $0 in container $1: $2",
+            block_id.ToString(), container->ToString(), record.DebugString()));
       }
 
       VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2",
@@ -1632,22 +1675,24 @@ void LogBlockManager::ProcessBlockRecord(const BlockRecordPB&
record,
       //
       // If we ignored deleted blocks, we would end up reusing the space
       // belonging to the last deleted block in the container.
-      container->UpdateBytesWritten(record.length());
+      container->UpdateBytesWritten(record.offset(), record.length());
       break;
     }
     case DELETE:
       if (block_map->erase(block_id) != 1) {
-        LOG(FATAL) << "Found DELETE record for invalid block "
-                   << block_id.ToString() << " in container "
-                   << container->ToString() << ": "
-                   << record.DebugString();
+        return Status::Corruption(Substitute(
+            "Found DELETE record for invalid block $0 in container $1: $2",
+            block_id.ToString(), container->ToString(), record.DebugString()));
       }
       VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
       break;
     default:
-      LOG(FATAL) << "Found unknown op type in block record: "
-                 << record.DebugString();
+      return Status::Corruption(Substitute(
+          "Found unknown op type in container $0: $1",
+          container->ToString(), record.DebugString()));
   }
+
+  return Status::OK();
 }
 
 std::string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer* container)
{

http://git-wip-us.apache.org/repos/asf/kudu/blob/faa587c6/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 817d8af..aaa983f 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -248,11 +248,13 @@ class LogBlockManager : public BlockManager {
   // already gone.
   scoped_refptr<internal::LogBlock> RemoveLogBlock(const BlockId& block_id);
 
-  // Parse a block record, adding or removing it in 'block_map', and
+  // Parses a block record, adding or removing it in 'block_map', and
   // accounting for it in the metadata for 'container'.
-  void ProcessBlockRecord(const BlockRecordPB& record,
-                          internal::LogBlockContainer* container,
-                          UntrackedBlockMap* block_map);
+  //
+  // Returns a bad status if the record is malformed in some way.
+  Status ProcessBlockRecord(const BlockRecordPB& record,
+                            internal::LogBlockContainer* container,
+                            UntrackedBlockMap* block_map);
 
   // Open a particular data directory belonging to the block manager.
   //

http://git-wip-us.apache.org/repos/asf/kudu/blob/faa587c6/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 27e1ce8..e6b01f9 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -55,7 +55,7 @@
 
 DEFINE_double(sys_catalog_fail_during_write, 0.0,
               "Fraction of the time when system table writes will fail");
-TAG_FLAG(sys_catalog_fail_during_write, unsafe);
+TAG_FLAG(sys_catalog_fail_during_write, hidden);
 
 using kudu::consensus::CONSENSUS_CONFIG_COMMITTED;
 using kudu::consensus::ConsensusMetadata;

http://git-wip-us.apache.org/repos/asf/kudu/blob/faa587c6/src/kudu/util/env.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h
index 2748413..a9b8751 100644
--- a/src/kudu/util/env.h
+++ b/src/kudu/util/env.h
@@ -296,6 +296,12 @@ class Env {
   // kernel) to be equal to the hard limit.
   virtual void IncreaseOpenFileLimit() = 0;
 
+  // Special string injected into file-growing operations' random failures
+  // (if enabled).
+  //
+  // Only useful for tests.
+  static const char* const kInjectedFailureStatusMsg;
+
  private:
   // No copying allowed
   Env(const Env&);

http://git-wip-us.apache.org/repos/asf/kudu/blob/faa587c6/src/kudu/util/env_posix.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 458cdd0..715207f 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -39,6 +39,7 @@
 #include "kudu/util/env.h"
 #include "kudu/util/errno.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/monotime.h"
@@ -106,6 +107,10 @@ DEFINE_bool(never_fsync, false,
 TAG_FLAG(never_fsync, advanced);
 TAG_FLAG(never_fsync, unsafe);
 
+DEFINE_double(env_inject_io_error_on_write_or_preallocate, 0.0,
+              "Fraction of the time that write or preallocate operations will fail");
+TAG_FLAG(env_inject_io_error_on_write_or_preallocate, hidden);
+
 using base::subtle::Atomic64;
 using base::subtle::Barrier_AtomicIncrement;
 using std::string;
@@ -118,6 +123,8 @@ static Atomic64 cur_thread_local_id_;
 
 namespace kudu {
 
+const char* const Env::kInjectedFailureStatusMsg = "INJECTED FAILURE";
+
 namespace {
 
 #if defined(__APPLE__)
@@ -360,6 +367,9 @@ class PosixWritableFile : public WritableFile {
   }
 
   virtual Status PreAllocate(uint64_t size) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error_on_write_or_preallocate,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+
     TRACE_EVENT1("io", "PosixWritableFile::PreAllocate", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
     uint64_t offset = std::max(filesize_, pre_allocated_size_);
@@ -453,6 +463,9 @@ class PosixWritableFile : public WritableFile {
 
   Status DoWritev(const vector<Slice>& data_vector,
                   size_t offset, size_t n) {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error_on_write_or_preallocate,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+
     ThreadRestrictions::AssertIOAllowed();
 #if defined(__linux__)
     DCHECK_LE(n, IOV_MAX);
@@ -560,6 +573,9 @@ class PosixRWFile : public RWFile {
   }
 
   virtual Status Write(uint64_t offset, const Slice& data) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error_on_write_or_preallocate,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+
     ThreadRestrictions::AssertIOAllowed();
     ssize_t written;
     RETRY_ON_EINTR(written, pwrite(fd_, data.data(), data.size(), offset));
@@ -581,6 +597,9 @@ class PosixRWFile : public RWFile {
   }
 
   virtual Status PreAllocate(uint64_t offset, size_t length) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error_on_write_or_preallocate,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+
     TRACE_EVENT1("io", "PosixRWFile::PreAllocate", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
     if (fallocate(fd_, 0, offset, length) < 0) {


Mime
View raw message