kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject kudu git commit: block_manager: various changes to disk space reservation checking
Date Fri, 04 Nov 2016 14:26:02 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 63059a903 -> 1f35e1eb5


block_manager: various changes to disk space reservation checking

I was trying to move log block container preallocation out of CreateBlock(),
but it was very difficult to reason about its behavior due to how wrapped up
it was with disk space checking. So here's some more yak shaving.

First, disk space checking is now done in DataDirManager. I like this for
several reasons:
- DataDir already models a data directory; disk space checking just adds a
  couple fields.
- It feels natural for DataDirManager to incorporate the "ignore full data
  directories" behavior into GetNextDataDir().
- It makes disk space checking available to all block managers.

Second, its semantics are now slightly different. We no longer check if
we're about to exceed the reserved space; instead, we just check if we've
already exceeded it. This is simpler but it preserves the "soft" nature of
the reservation behavior. The main advantage is that we can now run the
check from anywhere instead of just before a file grows, which makes it
easier to reason about and reduces the coupling on preallocation.

Third, the published metric is now a function of data directories, not
containers. I don't think a container-based metric is particularly useful
here because technically all containers hosted on a data directory are
"unavailable" if the underlying filesystem is full; it's more relevant to
count the data directory itself.

Change-Id: Ica2bb710a246ef09890554d1d6ff6da528145a6e
Reviewed-on: http://gerrit.cloudera.org:8080/4831
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1f35e1eb
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1f35e1eb
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1f35e1eb

Branch: refs/heads/master
Commit: 1f35e1eb507ec1bb0061497dde6c9798b35467c8
Parents: 63059a9
Author: Adar Dembo <adar@cloudera.com>
Authored: Mon Oct 24 17:42:12 2016 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Fri Nov 4 14:04:35 2016 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager-test.cc               |  77 ++++++--
 src/kudu/fs/data_dirs.cc                        | 120 ++++++++++--
 src/kudu/fs/data_dirs.h                         |  51 ++++-
 src/kudu/fs/file_block_manager.cc               |   6 +-
 src/kudu/fs/log_block_manager.cc                | 189 ++++---------------
 src/kudu/fs/log_block_manager.h                 |  49 +----
 .../integration-tests/disk_reservation-itest.cc |  17 +-
 src/kudu/util/metrics.h                         |   1 +
 8 files changed, 270 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1f35e1eb/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index b922eab..e780caf 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -53,7 +53,7 @@ DECLARE_uint64(log_container_max_size);
 DECLARE_int64(fs_data_dirs_reserved_bytes);
 DECLARE_int64(disk_reserved_bytes_free_for_testing);
 
-DECLARE_int32(log_block_manager_full_disk_cache_seconds);
+DECLARE_int32(fs_data_dirs_full_disk_cache_seconds);
 DECLARE_string(block_manager);
 
 // Generic block manager metrics.
@@ -70,6 +70,9 @@ METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
 METRIC_DECLARE_counter(log_block_manager_containers);
 METRIC_DECLARE_counter(log_block_manager_full_containers);
 
+// Data directory metrics.
+METRIC_DECLARE_gauge_uint64(data_dirs_full);
+
 // The LogBlockManager is only supported on Linux, since it requires hole punching.
 #define RETURN_NOT_LOG_BLOCK_MANAGER() \
   do { \
@@ -753,7 +756,7 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
     ASSERT_OK(writer->Close());
   }
 
-  ASSERT_EQ(4, bm_->available_containers_.size());
+  ASSERT_EQ(4, bm_->all_containers_.size());
 
   // Delete the original blocks.
   for (const BlockId& b : block_ids) {
@@ -818,7 +821,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
 
   // Start corrupting the metadata file in different ways.
 
-  string path = LogBlockManager::ContainerPathForTests(bm_->available_containers_.front());
+  string path = LogBlockManager::ContainerPathForTests(bm_->all_containers_.front());
   string metadata_path = path + LogBlockManager::kContainerMetadataFileSuffix;
   string data_path = path + LogBlockManager::kContainerDataFileSuffix;
 
@@ -926,7 +929,8 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
 
   // Ensure that we only ever created a single container.
   ASSERT_EQ(1, bm_->all_containers_.size());
-  ASSERT_EQ(1, bm_->available_containers_.size());
+  ASSERT_EQ(1, bm_->available_containers_by_data_dir_.size());
+  ASSERT_EQ(1, bm_->available_containers_by_data_dir_.begin()->second.size());
 
   // Find location of 2nd record in metadata file and corrupt it.
   // This is an unrecoverable error because it's in the middle of the file.
@@ -979,26 +983,61 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
 TEST_F(LogBlockManagerTest, TestDiskSpaceCheck) {
   RETURN_NOT_LOG_BLOCK_MANAGER();
 
-  FLAGS_log_block_manager_full_disk_cache_seconds = 0; // Don't cache device fullness.
+  // Reopen the block manager with metrics enabled.
+  MetricRegistry registry;
+  scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry,
"test");
+  ASSERT_OK(this->ReopenBlockManager(entity,
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
 
+  FLAGS_fs_data_dirs_full_disk_cache_seconds = 0; // Don't cache device fullness.
   FLAGS_fs_data_dirs_reserved_bytes = 1; // Keep at least 1 byte reserved in the FS.
-  FLAGS_disk_reserved_bytes_free_for_testing = 0;
-  FLAGS_log_container_preallocate_bytes = 100;
-
-  vector<BlockId> created_blocks;
-  gscoped_ptr<WritableBlock> writer;
-  Status s = bm_->CreateBlock(&writer);
-  ASSERT_TRUE(s.IsIOError());
-  ASSERT_STR_CONTAINS(s.ToString(), "All data directories are full");
 
-  FLAGS_disk_reserved_bytes_free_for_testing = 101;
-  ASSERT_OK(bm_->CreateBlock(&writer));
+  // Normally, a data dir is checked for fullness only after a block is closed;
+  // if it's now full, the next attempt at block creation will fail. Only when
+  // a data dir was last observed as full is it also checked before block creation.
+  //
+  // This behavior enforces a "soft" limit on disk space consumption but
+  // complicates testing somewhat.
+  bool data_dir_observed_full = false;
 
-  FLAGS_disk_reserved_bytes_free_for_testing = 0;
-  s = bm_->CreateBlock(&writer);
-  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  int i = 0;
+  for (int free_space : { 0, 2, 0 }) {
+    FLAGS_disk_reserved_bytes_free_for_testing = free_space;
 
-  ASSERT_OK(writer->Close());
+    for (int attempt = 0; attempt < 3; attempt++) {
+      gscoped_ptr<WritableBlock> writer;
+      LOG(INFO) << "Attempt #" << ++i;
+      Status s = bm_->CreateBlock(&writer);
+      if (FLAGS_disk_reserved_bytes_free_for_testing < FLAGS_fs_data_dirs_reserved_bytes)
{
+        if (data_dir_observed_full) {
+          // The dir was previously observed as full, so CreateBlock() checked
+          // fullness again and failed.
+          ASSERT_TRUE(s.IsIOError());
+          ASSERT_STR_CONTAINS(s.ToString(), "All data directories are full");
+        } else {
+          ASSERT_OK(s);
+          ASSERT_OK(writer->Close());
+
+          // The dir was not previously full so CreateBlock() did not check for
+          // fullness, but given the parameters of the test, we know that the
+          // dir was observed as full at Close().
+          data_dir_observed_full = true;
+        }
+        ASSERT_EQ(1, down_cast<AtomicGauge<uint64_t>*>(
+            entity->FindOrNull(METRIC_data_dirs_full).get())->value());
+      } else {
+        // CreateBlock() succeeded regardless of the previously fullness state,
+        // and the new state is definitely not full.
+        ASSERT_OK(s);
+        ASSERT_OK(writer->Close());
+        data_dir_observed_full = false;
+        ASSERT_EQ(0, down_cast<AtomicGauge<uint64_t>*>(
+            entity->FindOrNull(METRIC_data_dirs_full).get())->value());
+      }
+    }
+  }
 }
 
 } // namespace fs

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f35e1eb/src/kudu/fs/data_dirs.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index 632f5b1..6a2b78c 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -42,7 +42,9 @@
 #include "kudu/util/atomic.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/oid_generator.h"
 #include "kudu/util/path_util.h"
@@ -50,6 +52,23 @@
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/threadpool.h"
 
+DEFINE_int64(fs_data_dirs_reserved_bytes, 0,
+             "Number of bytes to reserve on each data directory filesystem for non-Kudu usage.
"
+             "Only works when --log_container_preallocate_bytes is non-zero.");
+TAG_FLAG(fs_data_dirs_reserved_bytes, runtime);
+TAG_FLAG(fs_data_dirs_reserved_bytes, evolving);
+
+DEFINE_int32(fs_data_dirs_full_disk_cache_seconds, 30,
+             "Number of seconds we cache the full-disk status in the block manager. "
+             "During this time, writes to the corresponding root path will not be attempted.");
+TAG_FLAG(fs_data_dirs_full_disk_cache_seconds, advanced);
+TAG_FLAG(fs_data_dirs_full_disk_cache_seconds, evolving);
+
+METRIC_DEFINE_gauge_uint64(server, data_dirs_full,
+                           "Data Directories Full",
+                           kudu::MetricUnit::kDataDirectories,
+                           "Number of data directories whose disks are currently full");
+
 namespace kudu {
 
 namespace fs {
@@ -114,15 +133,25 @@ Status CheckHolePunch(Env* env, const string& path) {
 
 } // anonymous namespace
 
+#define GINIT(x) x(METRIC_##x.Instantiate(entity, 0))
+DataDirMetrics::DataDirMetrics(const scoped_refptr<MetricEntity>& entity)
+  : GINIT(data_dirs_full)  {
+}
+#undef GINIT
+
+
 DataDir::DataDir(Env* env,
+                 DataDirMetrics* metrics,
                  string dir,
                  unique_ptr<PathInstanceMetadataFile> metadata_file,
                  unique_ptr<ThreadPool> pool)
     : env_(env),
+      metrics_(metrics),
       dir_(std::move(dir)),
       metadata_file_(std::move(metadata_file)),
       pool_(std::move(pool)),
-      is_shutdown_(false) {
+      is_shutdown_(false),
+      is_full_(false) {
 }
 
 DataDir::~DataDir() {
@@ -152,7 +181,51 @@ void DataDir::WaitOnClosures() {
   pool_->Wait();
 }
 
+Status DataDir::RefreshIsFull(RefreshMode mode) {
+  switch (mode) {
+    case RefreshMode::EXPIRED_ONLY: {
+      std::lock_guard<simple_spinlock> l(lock_);
+      DCHECK(last_check_is_full_.Initialized());
+      MonoTime expiry = last_check_is_full_ + MonoDelta::FromSeconds(
+          FLAGS_fs_data_dirs_full_disk_cache_seconds);
+      if (!is_full_ || MonoTime::Now() < expiry) {
+        break;
+      }
+      FALLTHROUGH_INTENDED; // Root was previously full, check again.
+    }
+    case RefreshMode::ALWAYS: {
+      Status s = env_util::VerifySufficientDiskSpace(
+          env_, dir_, 0, FLAGS_fs_data_dirs_reserved_bytes);
+      bool is_full_new;
+      if (PREDICT_FALSE(s.IsIOError() && s.posix_code() == ENOSPC)) {
+        LOG(WARNING) << Substitute(
+            "Insufficient disk space under path $0: creation of new data "
+            "blocks under this path can be retried after $1 seconds: $2",
+            dir_, FLAGS_fs_data_dirs_full_disk_cache_seconds, s.ToString());
+        s = Status::OK();
+        is_full_new = true;
+      } else {
+        is_full_new = false;
+      }
+      RETURN_NOT_OK(s); // Catch other types of IOErrors, etc.
+      {
+        std::lock_guard<simple_spinlock> l(lock_);
+        if (metrics_ && is_full_ != is_full_new) {
+          metrics_->data_dirs_full->IncrementBy(is_full_new ? 1 : -1);
+        }
+        is_full_ = is_full_new;
+        last_check_is_full_ = MonoTime::Now();
+      }
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unknown check mode";
+  }
+  return Status::OK();
+}
+
 DataDirManager::DataDirManager(Env* env,
+                               scoped_refptr<MetricEntity> metric_entity,
                                string block_manager_type,
                                vector<string> paths)
     : env_(env),
@@ -160,6 +233,10 @@ DataDirManager::DataDirManager(Env* env,
       paths_(std::move(paths)),
       data_dirs_next_(0) {
   DCHECK_GT(paths_.size(), 0);
+
+  if (metric_entity) {
+    metrics_.reset(new DataDirMetrics(metric_entity));
+  }
 }
 
 DataDirManager::~DataDirManager() {
@@ -264,10 +341,13 @@ Status DataDirManager::Open(int max_data_dirs, LockMode mode) {
 
     // Create the data directory in-memory structure itself.
     unique_ptr<DataDir> dd(new DataDir(
-        env_, p,
+        env_, metrics_.get(), p,
         unique_ptr<PathInstanceMetadataFile>(instance.release()),
         unique_ptr<ThreadPool>(pool.release())));
 
+    // Initialize the 'fullness' status of the data directory.
+    RETURN_NOT_OK(dd->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
+
     dds.emplace_back(std::move(dd));
     i++;
   }
@@ -303,16 +383,34 @@ Status DataDirManager::Open(int max_data_dirs, LockMode mode) {
   return Status::OK();
 }
 
-DataDir* DataDirManager::GetNextDataDir() {
-  // Round robin through the data dirs.
-  int32_t cur_idx;
-  int32_t next_idx;
-  do {
-    cur_idx = data_dirs_next_.Load();
-    next_idx = (cur_idx + 1) % data_dirs_.size();
-  } while (!data_dirs_next_.CompareAndSet(cur_idx, next_idx));
+Status DataDirManager::GetNextDataDir(DataDir** dir) {
+  // Round robin through the data dirs, ignoring ones that are full.
+  unordered_set<DataDir*> full_dds;
+  while (true) {
+    int32_t cur_idx;
+    int32_t next_idx;
+    do {
+      cur_idx = data_dirs_next_.Load();
+      next_idx = (cur_idx + 1) % data_dirs_.size();
+    } while (!data_dirs_next_.CompareAndSet(cur_idx, next_idx));
+
+    DataDir* candidate = data_dirs_[cur_idx].get();
+    RETURN_NOT_OK(candidate->RefreshIsFull(
+        DataDir::RefreshMode::EXPIRED_ONLY));
+    if (!candidate->is_full()) {
+      *dir = candidate;
+      return Status::OK();
+    }
 
-  return data_dirs_[cur_idx].get();
+    // This data dir was full. If all are full, we can't satisfy the request.
+    full_dds.insert(candidate);
+    if (full_dds.size() == data_dirs_.size()) {
+      return Status::IOError(
+          "All data directories are full. Please free some disk space or "
+          "consider changing the fs_data_dirs_reserved_bytes configuration "
+          "parameter", "", ENOSPC);
+    }
+  }
 }
 
 DataDir* DataDirManager::FindDataDirByUuidIndex(uint16_t uuid_idx) const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f35e1eb/src/kudu/fs/data_dirs.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/data_dirs.h b/src/kudu/fs/data_dirs.h
index 1402077..e8dc71a 100644
--- a/src/kudu/fs/data_dirs.h
+++ b/src/kudu/fs/data_dirs.h
@@ -18,27 +18,40 @@
 #pragma once
 
 #include <memory>
+#include <mutex>
 #include <string>
 #include <unordered_map>
 #include <vector>
 
 #include "kudu/gutil/callback_forward.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/atomic.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
+template<typename T>
+class AtomicGauge;
 class Env;
+class MetricEntity;
 class ThreadPool;
 
 namespace fs {
 class PathInstanceMetadataFile;
 
+struct DataDirMetrics {
+  explicit DataDirMetrics(const scoped_refptr<MetricEntity>& entity);
+
+  scoped_refptr<AtomicGauge<uint64_t>> data_dirs_full;
+};
+
 // Representation of a data directory in use by the block manager.
 class DataDir {
  public:
   DataDir(Env* env,
+          DataDirMetrics* metrics,
           std::string dir,
           std::unique_ptr<PathInstanceMetadataFile> metadata_file,
           std::unique_ptr<ThreadPool> pool);
@@ -57,20 +70,46 @@ class DataDir {
   // Waits for any outstanding closures submitted via ExecClosure() to finish.
   void WaitOnClosures();
 
+  // Tests whether the data directory is full by comparing the free space of
+  // its underlying filesystem with a predefined "reserved" space value.
+  //
+  // If 'mode' is EXPIRED_ONLY, performs the test only if the dir was last
+  // determined to be full some time ago. If 'mode' is ALWAYS, the test is
+  // performed regardless.
+  //
+  // Only returns a bad Status in the event of a real error; fullness is
+  // reflected via is_full().
+  enum class RefreshMode {
+    EXPIRED_ONLY,
+    ALWAYS,
+  };
+  Status RefreshIsFull(RefreshMode mode);
+
   const std::string& dir() const { return dir_; }
 
   const PathInstanceMetadataFile* instance() const {
     return metadata_file_.get();
   }
 
+  bool is_full() const {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return is_full_;
+  }
+
  private:
   Env* env_;
+  DataDirMetrics* metrics_;
   const std::string dir_;
   const std::unique_ptr<PathInstanceMetadataFile> metadata_file_;
   const std::unique_ptr<ThreadPool> pool_;
 
   bool is_shutdown_;
 
+  // Protects 'last_check_is_full_' and 'is_full_'.
+  mutable simple_spinlock lock_;
+  MonoTime last_check_is_full_;
+  bool is_full_;
+
   DISALLOW_COPY_AND_ASSIGN(DataDir);
 };
 
@@ -89,6 +128,7 @@ class DataDirManager {
   };
 
   DataDirManager(Env* env,
+                 scoped_refptr<MetricEntity> metric_entity,
                  std::string block_manager_type,
                  std::vector<std::string> paths);
   ~DataDirManager();
@@ -107,9 +147,12 @@ class DataDirManager {
   // 'max_data_dirs', or if 'mode' is MANDATORY and locks could not be taken.
   Status Open(int max_data_dirs, LockMode mode);
 
-  // Retrieves the next data directory. Directories are rotated
-  // via round-robin.
-  DataDir* GetNextDataDir();
+  // Retrieves the next data directory that isn't full. Directories are rotated
+  // via round-robin. Full directories are skipped.
+  //
+  // Returns an error if all data directories are full, or upon filesystem
+  // error. On success, 'dir' is guaranteed to be set.
+  Status GetNextDataDir(DataDir** dir);
 
   // Finds a data directory by uuid index, returning nullptr if it can't be
   // found.
@@ -131,6 +174,8 @@ class DataDirManager {
   const std::string block_manager_type_;
   const std::vector<std::string> paths_;
 
+  std::unique_ptr<DataDirMetrics> metrics_;
+
   std::vector<std::unique_ptr<DataDir>> data_dirs_;
 
   AtomicInt<int32_t> data_dirs_next_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f35e1eb/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 2a49eb1..5e85670 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -21,6 +21,7 @@
 #include <vector>
 
 #include "kudu/fs/block_manager_metrics.h"
+#include "kudu/fs/data_dirs.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/env.h"
@@ -499,7 +500,7 @@ bool FileBlockManager::FindBlockPath(const BlockId& block_id,
 FileBlockManager::FileBlockManager(Env* env, const BlockManagerOptions& opts)
   : env_(DCHECK_NOTNULL(env)),
     read_only_(opts.read_only),
-    dd_manager_(env, kBlockManagerType, opts.root_paths),
+    dd_manager_(env, opts.metric_entity, kBlockManagerType, opts.root_paths),
     rand_(GetRandomSeed32()),
     next_block_id_(rand_.Next64()),
     mem_tracker_(MemTracker::CreateTracker(-1,
@@ -535,7 +536,8 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
                                      gscoped_ptr<WritableBlock>* block) {
   CHECK(!read_only_);
 
-  DataDir* dir = dd_manager_.GetNextDataDir();
+  DataDir* dir;
+  RETURN_NOT_OK(dd_manager_.GetNextDataDir(&dir));
   uint16_t uuid_idx;
   CHECK(dd_manager_.FindUuidIndexByDataDir(dir, &uuid_idx));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f35e1eb/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 4634e86..a480a94 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -67,19 +67,6 @@ DEFINE_bool(log_block_manager_test_hole_punching, true,
 TAG_FLAG(log_block_manager_test_hole_punching, advanced);
 TAG_FLAG(log_block_manager_test_hole_punching, unsafe);
 
-DEFINE_int32(log_block_manager_full_disk_cache_seconds, 30,
-             "Number of seconds we cache the full-disk status in the block manager. "
-             "During this time, writes to the corresponding root path will not be attempted.");
-TAG_FLAG(log_block_manager_full_disk_cache_seconds, advanced);
-TAG_FLAG(log_block_manager_full_disk_cache_seconds, evolving);
-
-DEFINE_int64(fs_data_dirs_reserved_bytes, 0,
-             "Number of bytes to reserve on each data directory filesystem for non-Kudu usage.
"
-             "Only works with the log block manager and when --log_container_preallocate_bytes
"
-             "is non-zero.");
-TAG_FLAG(fs_data_dirs_reserved_bytes, runtime);
-TAG_FLAG(fs_data_dirs_reserved_bytes, evolving);
-
 METRIC_DEFINE_gauge_uint64(server, log_block_manager_bytes_under_management,
                            "Bytes Under Management",
                            kudu::MetricUnit::kBytes,
@@ -106,7 +93,6 @@ METRIC_DEFINE_counter(server, log_block_manager_unavailable_containers,
                       "Number of non-full log block containers that are under root paths
"
                       "whose disks are full");
 
-
 namespace kudu {
 
 namespace fs {
@@ -139,7 +125,6 @@ struct LogBlockManagerMetrics {
 
   scoped_refptr<Counter> containers;
   scoped_refptr<Counter> full_containers;
-  scoped_refptr<Counter> unavailable_containers;
 };
 
 #define MINIT(x) x(METRIC_log_block_manager_##x.Instantiate(metric_entity))
@@ -149,8 +134,7 @@ LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>
     GINIT(bytes_under_management),
     GINIT(blocks_under_management),
     MINIT(containers),
-    MINIT(full_containers),
-    MINIT(unavailable_containers) {
+    MINIT(full_containers) {
 }
 #undef GINIT
 #undef MINIT
@@ -282,6 +266,7 @@ class LogBlockContainer {
   }
   const LogBlockManagerMetrics* metrics() const { return metrics_; }
   const DataDir* data_dir() const { return data_dir_; }
+  DataDir* mutable_data_dir() const { return data_dir_; }
   const PathInstanceMetadataPB* instance() const { return data_dir_->instance()->metadata();
}
 
  private:
@@ -1093,7 +1078,7 @@ LogBlockManager::LogBlockManager(Env* env, const BlockManagerOptions&
opts)
   : mem_tracker_(MemTracker::CreateTracker(-1,
                                            "log_block_manager",
                                            opts.parent_mem_tracker)),
-    dd_manager_(env, kBlockManagerType, opts.root_paths),
+    dd_manager_(env, opts.metric_entity, kBlockManagerType, opts.root_paths),
     blocks_by_block_id_(10,
                         BlockMap::hasher(),
                         BlockMap::key_equal(),
@@ -1187,83 +1172,19 @@ Status LogBlockManager::CreateBlock(const CreateBlockOptions&
opts,
                                     gscoped_ptr<WritableBlock>* block) {
   CHECK(!read_only_);
 
-  // Root paths that are below their reserved space threshold. Initialize the
-  // paths from the FullDiskCache. This function-local cache is necessary for
-  // correctness in case the FullDiskCache expiration time is set to 0.
-  unordered_set<const DataDir*> full_root_paths(dd_manager_.data_dirs().size());
-  for (const auto& dd : dd_manager_.data_dirs()) {
-    if (full_disk_cache_.IsRootFull(dd.get())) {
-      InsertOrDie(&full_root_paths, dd.get());
-    }
-  }
-
   // Find a free container. If one cannot be found, create a new one.
-  // In case one or more root paths have hit their reserved space limit, we
-  // retry until we have exhausted all root paths.
   //
-  // TODO: should we cap the number of outstanding containers and force
-  // callers to block if we've reached it?
-  LogBlockContainer* container = nullptr;
-  while (!container) {
-    container = GetAvailableContainer(full_root_paths);
-    if (!container) {
-      // If all root paths are full, we cannot allocate a block.
-      if (full_root_paths.size() == dd_manager_.data_dirs().size()) {
-        return Status::IOError("Unable to allocate block: All data directories are full.
"
-                               "Please free some disk space or consider changing the "
-                               "fs_data_dirs_reserved_bytes configuration parameter",
-                               "", ENOSPC);
-      }
-
-      DataDir* dir;
-      do {
-        dir = dd_manager_.GetNextDataDir();
-      } while (ContainsKey(full_root_paths, dir));
-      if (full_disk_cache_.IsRootFull(dir)) {
-        InsertOrDie(&full_root_paths, dir);
-        continue;
-      }
-
-      gscoped_ptr<LogBlockContainer> new_container;
-      RETURN_NOT_OK_PREPEND(LogBlockContainer::Create(this,
-                                                      dir,
-                                                      &new_container),
-                            "Could not create new log block container at " + dir->dir());
-      container = new_container.release();
-      {
-        std::lock_guard<simple_spinlock> l(lock_);
-        dirty_dirs_.insert(dir->dir());
-        AddNewContainerUnlocked(container);
-      }
-    }
-
-    // By preallocating with each CreateBlock(), we're effectively
-    // maintaining a rolling buffer of preallocated data just ahead of where
-    // the next write will fall.
-    if (FLAGS_log_container_preallocate_bytes) {
-      // TODO: The use of FLAGS_log_container_preallocate_bytes may be a poor
-      // estimate for the number of bytes we are about to consume for a block.
-      // In the future, we may also want to implement some type of "hard" limit
-      // to ensure that a giant block doesn't blow through the configured
-      // reserved disk space.
-      Status s = env_util::VerifySufficientDiskSpace(env_, container->DataFilePath(),
-                                                     FLAGS_log_container_preallocate_bytes,
-                                                     FLAGS_fs_data_dirs_reserved_bytes);
-      if (PREDICT_FALSE(s.IsIOError() && s.posix_code() == ENOSPC)) {
-        LOG(ERROR) << Substitute("Log block manager: Insufficient disk space under
path $0: "
-                                 "Creation of new data blocks under this path can be retried
after "
-                                 "$1 seconds: $2", container->data_dir()->dir(),
-                                 FLAGS_log_block_manager_full_disk_cache_seconds, s.ToString());
-        // Blacklist this root globally and locally.
-        full_disk_cache_.MarkRootFull(container->data_dir());
-        InsertOrDie(&full_root_paths, container->data_dir());
-        MakeContainerAvailable(container);
-        container = nullptr;
-        continue;
-      }
-      RETURN_NOT_OK(s); // Catch other types of IOErrors, etc.
-      RETURN_NOT_OK(container->Preallocate(FLAGS_log_container_preallocate_bytes));
-    }
+  // TODO(unknown): should we cap the number of outstanding containers and
+  // force callers to block if we've reached it?
+  LogBlockContainer* container;
+  RETURN_NOT_OK(GetOrCreateContainer(&container));
+  if (FLAGS_log_container_preallocate_bytes) {
+    RETURN_NOT_OK(container->Preallocate(FLAGS_log_container_preallocate_bytes));
+
+    // Preallocation succeeded and the container has grown; recheck its data
+    // directory fullness.
+    RETURN_NOT_OK(container->mutable_data_dir()->RefreshIsFull(
+        DataDir::RefreshMode::ALWAYS));
   }
 
   // Generate a free block ID.
@@ -1366,50 +1287,33 @@ void LogBlockManager::AddNewContainerUnlocked(LogBlockContainer* container)
{
   }
 }
 
-LogBlockContainer* LogBlockManager::GetAvailableContainer(
-    const unordered_set<const DataDir*>& full_root_paths) {
-  LogBlockContainer* container = nullptr;
-  int64_t disk_full_containers_delta = 0;
-  MonoTime now = MonoTime::Now();
+Status LogBlockManager::GetOrCreateContainer(LogBlockContainer** container) {
+  DataDir* dir;
+  RETURN_NOT_OK(dd_manager_.GetNextDataDir(&dir));
+
   {
     std::lock_guard<simple_spinlock> l(lock_);
-    // Move containers from disk_full -> available.
-    while (!disk_full_containers_.empty() &&
-           disk_full_containers_.top().second < now) {
-      available_containers_.push_back(disk_full_containers_.top().first);
-      disk_full_containers_.pop();
-      disk_full_containers_delta -= 1;
-    }
-
-    // Return the first currently-available non-full-disk container (according to
-    // our full-disk cache).
-    while (!container && !available_containers_.empty()) {
-      container = available_containers_.front();
-      available_containers_.pop_front();
-      MonoTime expires;
-      // Note: We must check 'full_disk_cache_' before 'full_root_paths' in
-      // order to correctly use the expiry time provided by 'full_disk_cache_'.
-      if (full_disk_cache_.IsRootFull(container->data_dir(), &expires) ||
-          ContainsKey(full_root_paths, container->data_dir())) {
-        if (!expires.Initialized()) {
-          // It's no longer in the cache but we still consider it unusable.
-          // It will be moved back into 'available_containers_' on the next call.
-          expires = now;
-        }
-        disk_full_containers_.emplace(container, expires);
-        disk_full_containers_delta += 1;
-        container = nullptr;
-      }
+    auto& d = available_containers_by_data_dir_[DCHECK_NOTNULL(dir)];
+    if (!d.empty()) {
+      *container = d.front();
+      d.pop_front();
+      return Status::OK();
     }
   }
 
-  // Update the metrics in a batch.
-  if (metrics()) {
-    metrics()->unavailable_containers->IncrementBy(disk_full_containers_delta);
+  // All containers are in use; create a new one.
+  gscoped_ptr<LogBlockContainer> new_container;
+  RETURN_NOT_OK_PREPEND(LogBlockContainer::Create(this,
+                                                  dir,
+                                                  &new_container),
+                        "Could not create new log block container at " + dir->dir());
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    dirty_dirs_.insert(dir->dir());
+    AddNewContainerUnlocked(new_container.get());
   }
-
-  // Return the container we found, or null if we don't have anything available.
-  return container;
+  *container = new_container.release();
+  return Status::OK();
 }
 
 void LogBlockManager::MakeContainerAvailable(LogBlockContainer* container) {
@@ -1422,7 +1326,7 @@ void LogBlockManager::MakeContainerAvailableUnlocked(LogBlockContainer*
containe
   if (container->full()) {
     return;
   }
-  available_containers_.push_back(container);
+  available_containers_by_data_dir_[container->data_dir()].push_back(container);
 }
 
 Status LogBlockManager::SyncContainer(const LogBlockContainer& container) {
@@ -1638,26 +1542,5 @@ std::string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer*
   return container->ToString();
 }
 
-bool FullDiskCache::IsRootFull(const DataDir* root_path, MonoTime* expires_out) const {
-  const MonoTime* expires;
-  {
-    shared_lock<rw_spinlock> l(lock_.get_lock());
-    expires = FindOrNull(cache_, root_path);
-  }
-  if (expires == nullptr) return false; // No entry exists.
-  if (*expires < MonoTime::Now()) return false; // Expired.
-  if (expires_out != nullptr) {
-    *expires_out = *expires;
-  }
-  return true; // Root is still full according to the cache.
-}
-
-void FullDiskCache::MarkRootFull(const DataDir* root_path) {
-  MonoTime expires = MonoTime::Now() +
-      MonoDelta::FromSeconds(FLAGS_log_block_manager_full_disk_cache_seconds);
-  std::lock_guard<percpu_rwlock> l(lock_);
-  InsertOrUpdate(&cache_, root_path, expires); // Last one wins.
-}
-
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f35e1eb/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 618af8a..2700610 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -21,7 +21,6 @@
 #include <deque>
 #include <gtest/gtest_prod.h>
 #include <memory>
-#include <queue>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -149,22 +148,6 @@ struct LogBlockManagerMetrics;
 // - Evaluate and implement a solution for data integrity (e.g. per-block
 //   checksum).
 
-// A thread-safe cache that indicates whether a root path is full or not.
-// Includes expiration of the items in the cache. Cache entries are never deleted.
-class FullDiskCache {
- public:
-  // Returns true if the given 'root_path' has been marked full and the
-  // associated cache entry has not expired.
-  bool IsRootFull(const DataDir* root_path, MonoTime* expires_out = nullptr) const;
-
-  // Marks the given 'root_path' as "full".
-  void MarkRootFull(const DataDir* root_path);
-
- private:
-  mutable percpu_rwlock lock_;
-  std::unordered_map<const DataDir*, MonoTime> cache_;
-};
-
 // The log-backed block manager.
 class LogBlockManager : public BlockManager {
  public:
@@ -216,29 +199,16 @@ class LogBlockManager : public BlockManager {
       BlockIdEqual,
       BlockAllocator> BlockMap;
 
-  typedef std::pair<internal::LogBlockContainer*, MonoTime> ExpiringContainerPair;
-
-  class ExpiringContainerPairGreaterThanFunctor {
-   public:
-    bool operator()(const ExpiringContainerPair& a, const ExpiringContainerPair&
b) {
-      return b.second < a.second;
-    }
-  };
-
   // Adds an as of yet unseen container to this block manager.
   void AddNewContainerUnlocked(internal::LogBlockContainer* container);
 
   // Returns the next container available for writing using a round-robin
-  // selection policy, or null if no suitable container was found.
+  // selection policy, creating a new one if necessary.
   //
   // After returning, the container is considered to be in use. When
   // writing is finished, call MakeContainerAvailable() to make it
   // available to other writers.
-  //
-  // 'full_root_paths' is a blacklist containing root paths that are full.
-  // Containers with root paths in this list will not be returned.
-  internal::LogBlockContainer* GetAvailableContainer(
-      const std::unordered_set<const DataDir*>& full_root_paths);
+  Status GetOrCreateContainer(internal::LogBlockContainer** container);
 
   // Indicate that this container is no longer in use and can be handed out
   // to other writers.
@@ -326,15 +296,8 @@ class LogBlockManager : public BlockManager {
   // excluding containers that are either in use or full.
   //
   // Does not own the containers.
-  std::deque<internal::LogBlockContainer*> available_containers_;
-
-  // Holds only those containers that would be available, were they not on
-  // disks that are past their capacity. This priority queue consists of pairs
-  // of containers and timestamps. Those timestamps represent the next time
-  // that we should check whether the disk is full. The top of the priority
-  // queue is the lowest timestamp.
-  std::priority_queue<ExpiringContainerPair, std::vector<ExpiringContainerPair>,
-                      ExpiringContainerPairGreaterThanFunctor> disk_full_containers_;
+  std::unordered_map<const DataDir*,
+                     std::deque<internal::LogBlockContainer*>> available_containers_by_data_dir_;
 
   // Tracks dirty container directories.
   //
@@ -347,10 +310,6 @@ class LogBlockManager : public BlockManager {
   // If true, only read operations are allowed.
   const bool read_only_;
 
-  // A cache of which root paths are full as of the last time they were
-  // checked. This cache expires its entries after some period of time.
-  FullDiskCache full_disk_cache_;
-
   // For generating container names.
   ObjectIdGenerator oid_generator_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f35e1eb/src/kudu/integration-tests/disk_reservation-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/disk_reservation-itest.cc b/src/kudu/integration-tests/disk_reservation-itest.cc
index 77574bd..f5a6dd0 100644
--- a/src/kudu/integration-tests/disk_reservation-itest.cc
+++ b/src/kudu/integration-tests/disk_reservation-itest.cc
@@ -29,8 +29,8 @@ using strings::Substitute;
 DECLARE_string(block_manager);
 
 METRIC_DECLARE_entity(server);
+METRIC_DECLARE_gauge_uint64(data_dirs_full);
 METRIC_DECLARE_counter(log_block_manager_containers);
-METRIC_DECLARE_counter(log_block_manager_unavailable_containers);
 
 namespace kudu {
 
@@ -64,6 +64,9 @@ TEST_F(DiskReservationITest, TestFillMultipleDisks) {
   ts_flags.push_back("--flush_threshold_mb=0");
   ts_flags.push_back("--maintenance_manager_polling_interval_ms=50");
   ts_flags.push_back("--disable_core_dumps");
+  // Reserve one byte so that when we simulate 0 bytes free below, we'll start
+  // failing requests.
+  ts_flags.push_back("--fs_data_dirs_reserved_bytes=1");
   ts_flags.push_back(Substitute("--fs_data_dirs=$0/a,$0/b",
                                 GetTestDataDirectory()));
   ts_flags.push_back(Substitute("--disk_reserved_override_prefix_1_path_for_testing=$0/a",
@@ -102,17 +105,17 @@ TEST_F(DiskReservationITest, TestFillMultipleDisks) {
                               "disk_reserved_override_prefix_2_bytes_free_for_testing",
                               Substitute("$0", 1L * 1024 * 1024 * 1024)));
 
-  // Wait until we have 1 unusable container.
+  // Wait until we have one full data dir.
   while (true) {
-    int64_t num_unavailable_containers;
+    int64_t num_full_data_dirs;
     ASSERT_OK(GetTsCounterValue(cluster_->tablet_server(0),
-                                &METRIC_log_block_manager_unavailable_containers,
-                                &num_unavailable_containers));
-    if (num_unavailable_containers >= 1) break;
+                                &METRIC_data_dirs_full,
+                                &num_full_data_dirs));
+    if (num_full_data_dirs >= 1) break;
     SleepFor(MonoDelta::FromMilliseconds(10));
   }
 
-  LOG(INFO) << "Have 1 unavailable log block container";
+  LOG(INFO) << "Have 1 full data dir";
 
   // Now simulate that all disks are full.
   ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f35e1eb/src/kudu/util/metrics.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/metrics.h b/src/kudu/util/metrics.h
index b9e739b..875bcd1 100644
--- a/src/kudu/util/metrics.h
+++ b/src/kudu/util/metrics.h
@@ -379,6 +379,7 @@ struct MetricUnit {
     kTasks,
     kMessages,
     kContextSwitches,
+    kDataDirectories,
   };
   static const char* Name(Type unit);
 };


Mime
View raw message