kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] branch master updated: fs: separate out directory management code
Date Mon, 16 Dec 2019 21:17:31 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new eaf1b6d  fs: separate out directory management code
eaf1b6d is described below

commit eaf1b6d773e8b56413ddffa5be728a603f4c90e8
Author: Andrew Wong <awong@apache.org>
AuthorDate: Mon Dec 9 20:37:04 2019 -0800

    fs: separate out directory management code
    
    This splits directory management code out of the DataDirManager and into
    its own DirManager class that serves as the parent class of
    DataDirManager. All code related to initialization logic of the
    directory manager is moved into DirManager.
    
    Code that involves directory grouping is left to the DataDirManager,
    since I expect this to vary from directory manager to directory manager
    (e.g. I imagine that WALs will only use a single directory per tablet
    instead of being spread across multiple directories).
    
    I swapped from using DataDir everywhere to using the more generic Dir.
    The former implements the latter, but allows for the honoring and usage
    of the correct gflags (i.e. those defined for data directories).
    
    The Google C++ style guide recommends against using protected member
    variables, which I use in the DirManager base class. I think we've been
    somewhat permissive when it comes to following this rule, so I went with
    protected members to facilitate reuse.
    
    This patch only moves code around -- it doesn't change any existing
    logic.
    
    Change-Id: I633e1e32845b08eb24c5327a04af344b579b186a
    Reviewed-on: http://gerrit.cloudera.org:8080/14871
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Adar Dembo <adar@cloudera.com>
---
 src/kudu/fs/CMakeLists.txt                   |   1 +
 src/kudu/fs/block_manager-stress-test.cc     |   5 +-
 src/kudu/fs/block_manager-test.cc            |  10 +-
 src/kudu/fs/data_dirs-test.cc                |  59 +-
 src/kudu/fs/data_dirs.cc                     | 842 ++++-----------------------
 src/kudu/fs/data_dirs.h                      | 405 ++-----------
 src/kudu/fs/dir_manager.cc                   | 693 ++++++++++++++++++++++
 src/kudu/fs/dir_manager.h                    | 436 ++++++++++++++
 src/kudu/fs/error_manager.h                  |   2 +-
 src/kudu/fs/file_block_manager.cc            |  57 +-
 src/kudu/fs/fs_manager-test.cc               |  47 +-
 src/kudu/fs/fs_manager.cc                    |  10 +-
 src/kudu/fs/fs_manager.h                     |   3 +-
 src/kudu/fs/log_block_manager-test.cc        |  31 +-
 src/kudu/fs/log_block_manager.cc             |  70 +--
 src/kudu/fs/log_block_manager.h              |  14 +-
 src/kudu/tools/kudu-tool-test.cc             |   3 +-
 src/kudu/tserver/tablet_copy_client-test.cc  |   6 +-
 src/kudu/tserver/tablet_copy_service-test.cc |   5 +-
 src/kudu/tserver/tablet_server-test.cc       |   4 +-
 src/kudu/tserver/ts_tablet_manager.cc        |   9 +-
 src/kudu/tserver/ts_tablet_manager.h         |   2 +-
 22 files changed, 1470 insertions(+), 1244 deletions(-)

diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index bab8d54..514935e 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -30,6 +30,7 @@ add_library(kudu_fs
   block_manager.cc
   block_manager_metrics.cc
   data_dirs.cc
+  dir_manager.cc
   dir_util.cc
   error_manager.cc
   file_block_manager.cc
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index 27d4a5b..828c10a 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -28,7 +28,6 @@
 #include <vector>
 
 #include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -160,7 +159,7 @@ class BlockManagerStressTest : public KuduTest {
     // If non-standard paths were provided we need to delete them in between
     // test runs.
     if (!FLAGS_block_manager_paths.empty()) {
-      for (const auto& dd : dd_manager_->GetDataRoots()) {
+      for (const auto& dd : dd_manager_->GetRoots()) {
         WARN_NOT_OK(env_->DeleteRecursively(dd),
                     Substitute("Couldn't recursively delete $0", dd));
       }
@@ -510,7 +509,7 @@ void BlockManagerStressTest<FileBlockManager>::InjectNonFatalInconsistencies() {
 
 template <>
 void BlockManagerStressTest<LogBlockManager>::InjectNonFatalInconsistencies() {
-  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), rand_seed_);
+  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), rand_seed_);
   ASSERT_OK(corruptor.Init());
 
   for (int i = 0; i < FLAGS_num_inconsistencies; i++) {
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 31da823..1acb0d2 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -159,7 +159,7 @@ class BlockManagerTest : public KuduTest {
                         const shared_ptr<MemTracker>& parent_mem_tracker) {
     if (!dd_manager_) {
       DataDirManagerOptions opts;
-      opts.block_manager_type = block_manager_type<T>();
+      opts.dir_type = block_manager_type<T>();
       // Create a new directory manager if necessary.
       CHECK_OK(DataDirManager::CreateNewForTests(env_, { test_dir_ },
           opts, &dd_manager_));
@@ -180,14 +180,14 @@ class BlockManagerTest : public KuduTest {
     // manager first to enforce this.
     bm_.reset();
     DataDirManagerOptions opts;
-    opts.block_manager_type = block_manager_type<T>();
+    opts.dir_type = block_manager_type<T>();
     opts.metric_entity = metric_entity;
     if (create) {
       RETURN_NOT_OK(DataDirManager::CreateNewForTests(
-          env_, paths, std::move(opts), &dd_manager_));
+          env_, paths, opts, &dd_manager_));
     } else {
       RETURN_NOT_OK(DataDirManager::OpenExistingForTests(
-          env_, paths, std::move(opts), &dd_manager_));
+          env_, paths, opts, &dd_manager_));
     }
     bm_.reset(CreateBlockManager(metric_entity, parent_mem_tracker));
     RETURN_NOT_OK(bm_->Open(nullptr));
@@ -313,7 +313,7 @@ void BlockManagerTest<LogBlockManager>::RunBlockDistributionTest(const vector<st
 template <>
 void BlockManagerTest<FileBlockManager>::RunMultipathTest(const vector<string>& paths) {
   // Ensure that each path has an instance file and that it's well-formed.
-  for (const string& path : dd_manager_->GetDataDirs()) {
+  for (const string& path : dd_manager_->GetDirs()) {
     vector<string> children;
     ASSERT_OK(env_->GetChildren(path, &children));
     ASSERT_EQ(3, children.size());
diff --git a/src/kudu/fs/data_dirs-test.cc b/src/kudu/fs/data_dirs-test.cc
index 9fa9027..67672bd 100644
--- a/src/kudu/fs/data_dirs-test.cc
+++ b/src/kudu/fs/data_dirs-test.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/fs/data_dirs.h"
+
 #include <cmath>
 #include <cstdint>
 #include <memory>
@@ -32,7 +34,7 @@
 #include <gtest/gtest.h>
 
 #include "kudu/fs/block_manager.h"
-#include "kudu/fs/data_dirs.h"
+#include "kudu/fs/dir_manager.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
@@ -89,7 +91,7 @@ class DataDirsTest : public KuduTest {
     DataDirManagerOptions opts;
     opts.metric_entity = entity_;
     ASSERT_OK(DataDirManager::CreateNewForTests(
-        env_, GetDirNames(kNumDirs), std::move(opts), &dd_manager_));
+        env_, GetDirNames(kNumDirs), opts, &dd_manager_));
   }
 
  protected:
@@ -114,7 +116,7 @@ class DataDirsTest : public KuduTest {
 TEST_F(DataDirsTest, TestCreateGroup) {
   // Test that the DataDirManager doesn't know about the tablets we're about
   // to insert.
-  DataDir* dd = nullptr;
+  Dir* dd = nullptr;
   Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd);
   ASSERT_EQ(nullptr, dd);
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
@@ -183,7 +185,7 @@ TEST_F(DataDirsTest, TestLoadFromPB) {
 
 TEST_F(DataDirsTest, TestDeleteDataDirGroup) {
   ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
-  DataDir* dd;
+  Dir* dd;
   ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd));
   ASSERT_FALSE(dd->is_full());
   dd_manager_->DeleteDataDirGroup(test_tablet_name_);
@@ -226,7 +228,7 @@ TEST_F(DataDirsTest, TestFullDiskGrowsGroup) {
   FLAGS_env_inject_full = 1.0;
 
   // Try getting a new directory, adding if necessary.
-  DataDir* new_dir;
+  Dir* new_dir;
   ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &new_dir));
   unordered_set<string> old_dirs(data_dir_group.begin(), data_dir_group.end());
   ASSERT_FALSE(ContainsKey(old_dirs, new_dir->dir()));
@@ -259,7 +261,7 @@ TEST_F(DataDirsTest, TestGrowGroupInParallel) {
   const int kNumThreads = 32;
   vector<thread> threads;
   vector<Status> statuses(kNumThreads);
-  vector<DataDir*> dds(kNumThreads);
+  vector<Dir*> dds(kNumThreads);
   Barrier b(kNumThreads);
   for (int i = 0; i < kNumThreads; i++) {
     threads.emplace_back([&, i] {
@@ -277,7 +279,7 @@ TEST_F(DataDirsTest, TestGrowGroupInParallel) {
   unordered_set<string> old_dirs(data_dir_group.begin(), data_dir_group.end());
   ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &data_dir_group));
   ASSERT_EQ(FLAGS_fs_target_data_dirs_per_tablet + 1, data_dir_group.size());
-  DataDir* new_dir = dds[0];
+  Dir* new_dir = dds[0];
   ASSERT_FALSE(ContainsKey(old_dirs, new_dir->dir()));
 
   // All returned data directories should have been the newly added one.
@@ -289,16 +291,16 @@ TEST_F(DataDirsTest, TestGrowGroupInParallel) {
 TEST_F(DataDirsTest, TestFailedDirNotReturned) {
   FLAGS_fs_target_data_dirs_per_tablet = 2;
   ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
-  DataDir* dd;
-  DataDir* failed_dd;
+  Dir* dd;
+  Dir* failed_dd;
   int uuid_idx;
   // Fail one of the directories in the group and verify that it is not used.
   ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &failed_dd));
-  ASSERT_TRUE(dd_manager_->FindUuidIndexByDataDir(failed_dd, &uuid_idx));
+  ASSERT_TRUE(dd_manager_->FindUuidIndexByDir(failed_dd, &uuid_idx));
   // These calls are idempotent.
-  ASSERT_OK(dd_manager_->MarkDataDirFailed(uuid_idx));
-  ASSERT_OK(dd_manager_->MarkDataDirFailed(uuid_idx));
-  ASSERT_OK(dd_manager_->MarkDataDirFailed(uuid_idx));
+  ASSERT_OK(dd_manager_->MarkDirFailed(uuid_idx));
+  ASSERT_OK(dd_manager_->MarkDirFailed(uuid_idx));
+  ASSERT_OK(dd_manager_->MarkDirFailed(uuid_idx));
   ASSERT_EQ(1, down_cast<AtomicGauge<uint64_t>*>(
         entity_->FindOrNull(METRIC_data_dirs_failed).get())->value());
   for (int i = 0; i < 10; i++) {
@@ -307,8 +309,8 @@ TEST_F(DataDirsTest, TestFailedDirNotReturned) {
   }
 
   // Fail the other directory and verify that neither will be used.
-  ASSERT_TRUE(dd_manager_->FindUuidIndexByDataDir(dd, &uuid_idx));
-  ASSERT_OK(dd_manager_->MarkDataDirFailed(uuid_idx));
+  ASSERT_TRUE(dd_manager_->FindUuidIndexByDir(dd, &uuid_idx));
+  ASSERT_OK(dd_manager_->MarkDirFailed(uuid_idx));
   ASSERT_EQ(2, down_cast<AtomicGauge<uint64_t>*>(
         entity_->FindOrNull(METRIC_data_dirs_failed).get())->value());
   Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &failed_dd);
@@ -320,7 +322,7 @@ TEST_F(DataDirsTest, TestFailedDirNotAddedToGroup) {
   // Fail one dir and create a group with all directories. The failed directory
   // shouldn't be in the group.
   FLAGS_fs_target_data_dirs_per_tablet = kNumDirs;
-  ASSERT_OK(dd_manager_->MarkDataDirFailed(0));
+  ASSERT_OK(dd_manager_->MarkDirFailed(0));
   ASSERT_EQ(1, down_cast<AtomicGauge<uint64_t>*>(
         entity_->FindOrNull(METRIC_data_dirs_failed).get())->value());
   ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
@@ -338,10 +340,10 @@ TEST_F(DataDirsTest, TestFailedDirNotAddedToGroup) {
   dd_manager_->DeleteDataDirGroup(test_tablet_name_);
 
   for (int i = 1; i < kNumDirs - 1; i++) {
-    ASSERT_OK(dd_manager_->MarkDataDirFailed(i));
+    ASSERT_OK(dd_manager_->MarkDirFailed(i));
   }
-  Status s = dd_manager_->MarkDataDirFailed(kNumDirs - 1);
-  ASSERT_STR_CONTAINS(s.ToString(), "All data dirs have failed");
+  Status s = dd_manager_->MarkDirFailed(kNumDirs - 1);
+  ASSERT_STR_CONTAINS(s.ToString(), "All dirs have failed");
   ASSERT_TRUE(s.IsIOError());
 
   s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
@@ -365,7 +367,7 @@ TEST_F(DataDirsTest, TestLoadBalancingDistribution) {
   double sum_squared_dev = 0;
   for (const auto& e : dd_manager_->tablets_by_uuid_idx_map_) {
     LOG(INFO) << Substitute("$0 is storing data from $1 tablets.",
-        dd_manager_->data_dir_by_uuid_idx_[e.first]->dir(), e.second.size());
+        dd_manager_->dir_by_uuid_idx_[e.first]->dir(), e.second.size());
     double deviation = static_cast<double>(e.second.size()) - kMeanTabletsPerDir;
     sum_squared_dev += deviation * deviation;
   }
@@ -427,7 +429,7 @@ TEST_F(DataDirsTest, TestLoadBalancingBias) {
       kNumAdditionalTablets * FLAGS_fs_target_data_dirs_per_tablet) / kNumDirs;
   for (const auto& e : dd_manager_->tablets_by_uuid_idx_map_) {
     LOG(INFO) << Substitute("$0 is storing data from $1 tablets.",
-        dd_manager_->data_dir_by_uuid_idx_[e.first]->dir(), e.second.size());
+        dd_manager_->dir_by_uuid_idx_[e.first]->dir(), e.second.size());
     double deviation = static_cast<double>(e.second.size()) - kMeanTabletsPerDir;
     sum_squared_dev += deviation * deviation;
   }
@@ -463,8 +465,7 @@ class DataDirManagerTest : public DataDirsTest {
   }
 
   Status OpenDataDirManager() {
-    return DataDirManager::OpenExistingForTests(env_, test_roots_,
-        DataDirManagerOptions(), &dd_manager_);
+    return DataDirManager::OpenExistingForTests(env_, test_roots_, {}, &dd_manager_);
   }
 
   virtual int GetNumDirs() const { return kNumDirs; }
@@ -482,8 +483,7 @@ TEST_F(DataDirManagerTest, TestOpenWithFailedDirs) {
   for (const string& test_root : test_roots_) {
     ASSERT_OK(env_->CreateDir(test_root));
   }
-  ASSERT_OK(DataDirManager::CreateNewForTests(
-      env_, test_roots_, DataDirManagerOptions(), &dd_manager_));
+  ASSERT_OK(DataDirManager::CreateNewForTests(env_, test_roots_, {}, &dd_manager_));
 
   // Kill the first directory.
   FLAGS_crash_on_eio = false;
@@ -493,7 +493,7 @@ TEST_F(DataDirManagerTest, TestOpenWithFailedDirs) {
   // The directory manager will successfully open with the single failed directory.
   ASSERT_OK(OpenDataDirManager());
   set<int> failed_dirs;
-  ASSERT_EQ(1, dd_manager_->GetFailedDataDirs().size());
+  ASSERT_EQ(1, dd_manager_->GetFailedDirs().size());
 
   // Now fail almost all of the other directories, leaving the last intact.
   for (int i = 1; i < kNumDirs - 1; i++) {
@@ -502,13 +502,13 @@ TEST_F(DataDirManagerTest, TestOpenWithFailedDirs) {
   }
   // The directory manager should be aware of the new failures.
   ASSERT_OK(OpenDataDirManager());
-  ASSERT_EQ(kNumDirs - 1, dd_manager_->GetFailedDataDirs().size());
+  ASSERT_EQ(kNumDirs - 1, dd_manager_->GetFailedDirs().size());
 
   // Ensure that when there are no healthy data directories, the open will
   // yield an error.
   FLAGS_env_inject_eio_globs = JoinStrings(JoinPathSegmentsV(test_roots_, "**"), ",");
   Status s = DataDirManager::OpenExistingForTests(env_, test_roots_,
-      DataDirManagerOptions(), &dd_manager_);
+      {}, &dd_manager_);
   ASSERT_STR_CONTAINS(s.ToString(), "could not open directory manager");
   ASSERT_TRUE(s.IsNotFound());
 }
@@ -526,8 +526,7 @@ TEST_F(TooManyDataDirManagerTest, TestTooManyInternedStrings) {
   for (const auto& r : test_roots_) {
     ASSERT_OK(env_->CreateDir(r));
   }
-  ASSERT_OK(DataDirManager::CreateNewForTests(env_, test_roots_,
-      DataDirManagerOptions(), &dd_manager_));
+  ASSERT_OK(DataDirManager::CreateNewForTests(env_, test_roots_, {}, &dd_manager_));
 }
 
 } // namespace fs
diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index ddeb55f..92f8df1 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -21,11 +21,10 @@
 #include <cerrno>
 #include <cstddef>
 #include <cstdint>
-#include <iterator>
 #include <memory>
 #include <mutex>
-#include <ostream>
 #include <random>
+#include <set>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -39,25 +38,17 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/dir_util.h"
 #include "kudu/fs/fs.pb.h"
-#include "kudu/gutil/bind.h"
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
-#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/env.h"
-#include "kudu/util/env_util.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/oid_generator.h"
-#include "kudu/util/path_util.h"
 #include "kudu/util/pb_util.h"
-#include "kudu/util/random_util.h"
-#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/random.h"
 #include "kudu/util/status.h"
-#include "kudu/util/stopwatch.h"
 #include "kudu/util/test_util_prod.h"
 #include "kudu/util/threadpool.h"
 
@@ -138,123 +129,18 @@ using strings::Substitute;
 using strings::SubstituteAndAppend;
 
 
-namespace {
-
-// Wrapper for env_util::DeleteTmpFilesRecursively that is suitable for parallel
-// execution on a data directory's thread pool (which requires the return value
-// be void).
-void DeleteTmpFilesRecursively(Env* env, const string& path) {
-  WARN_NOT_OK(env_util::DeleteTmpFilesRecursively(env, path),
-              "Error while deleting temp files");
-}
-
-} // anonymous namespace
-
 ////////////////////////////////////////////////////////////
 // DataDirMetrics
 ////////////////////////////////////////////////////////////
 
-#define GINIT(x) x(METRIC_##x.Instantiate(entity, 0))
-DataDirMetrics::DataDirMetrics(const scoped_refptr<MetricEntity>& entity)
-  : GINIT(data_dirs_failed),
-    GINIT(data_dirs_full) {
+#define GINIT(member, x) member = METRIC_##x.Instantiate(metric_entity, 0)
+DataDirMetrics::DataDirMetrics(const scoped_refptr<MetricEntity>& metric_entity) {
+  GINIT(dirs_failed, data_dirs_failed);
+  GINIT(dirs_full, data_dirs_full);
 }
 #undef GINIT
 
 ////////////////////////////////////////////////////////////
-// DataDir
-////////////////////////////////////////////////////////////
-
-DataDir::DataDir(Env* env,
-                 DataDirMetrics* metrics,
-                 DataDirFsType fs_type,
-                 string dir,
-                 unique_ptr<DirInstanceMetadataFile> metadata_file,
-                 unique_ptr<ThreadPool> pool)
-    : env_(env),
-      metrics_(metrics),
-      fs_type_(fs_type),
-      dir_(std::move(dir)),
-      metadata_file_(std::move(metadata_file)),
-      pool_(std::move(pool)),
-      is_shutdown_(false),
-      is_full_(false),
-      available_bytes_(0) {
-}
-
-DataDir::~DataDir() {
-  Shutdown();
-}
-
-void DataDir::Shutdown() {
-  if (is_shutdown_) {
-    return;
-  }
-
-  WaitOnClosures();
-  pool_->Shutdown();
-  is_shutdown_ = true;
-}
-
-void DataDir::ExecClosure(const Closure& task) {
-  Status s = pool_->SubmitClosure(task);
-  if (!s.ok()) {
-    WARN_NOT_OK(
-        s, "Could not submit task to thread pool, running it synchronously");
-    task.Run();
-  }
-}
-
-void DataDir::WaitOnClosures() {
-  pool_->Wait();
-}
-
-Status DataDir::RefreshAvailableSpace(RefreshMode mode) {
-  switch (mode) {
-    case RefreshMode::EXPIRED_ONLY: {
-      std::lock_guard<simple_spinlock> l(lock_);
-      DCHECK(last_space_check_.Initialized());
-      MonoTime expiry = last_space_check_ + MonoDelta::FromSeconds(
-          FLAGS_fs_data_dirs_available_space_cache_seconds);
-      if (MonoTime::Now() < expiry) {
-        break;
-      }
-      FALLTHROUGH_INTENDED; // Root was previously full, check again.
-    }
-    case RefreshMode::ALWAYS: {
-      int64_t available_bytes_new;
-      Status s = env_util::VerifySufficientDiskSpace(
-          env_, dir_, 0, FLAGS_fs_data_dirs_reserved_bytes, &available_bytes_new);
-      bool is_full_new;
-      if (PREDICT_FALSE(s.IsIOError() && s.posix_code() == ENOSPC)) {
-        LOG(WARNING) << Substitute(
-            "Insufficient disk space under path $0: creation of new data "
-            "blocks under this path can be retried after $1 seconds: $2",
-            dir_, FLAGS_fs_data_dirs_available_space_cache_seconds, s.ToString());
-        s = Status::OK();
-        is_full_new = true;
-      } else {
-        is_full_new = false;
-      }
-      RETURN_NOT_OK_PREPEND(s, "Could not refresh fullness"); // Catch other types of IOErrors, etc.
-      {
-        std::lock_guard<simple_spinlock> l(lock_);
-        if (metrics_ && is_full_ != is_full_new) {
-          metrics_->data_dirs_full->IncrementBy(is_full_new ? 1 : -1);
-        }
-        is_full_ = is_full_new;
-        last_space_check_ = MonoTime::Now();
-        available_bytes_ = available_bytes_new;
-      }
-      break;
-    }
-    default:
-      LOG(FATAL) << "Unknown check mode";
-  }
-  return Status::OK();
-}
-
-////////////////////////////////////////////////////////////
 // DataDirGroup
 ////////////////////////////////////////////////////////////
 
@@ -297,541 +183,147 @@ Status DataDirGroup::CopyToPB(const UuidByUuidIndexMap& uuid_by_uuid_idx,
 }
 
 ////////////////////////////////////////////////////////////
-// DataDirManagerOptions
+// DataDir
 ////////////////////////////////////////////////////////////
 
-DataDirManagerOptions::DataDirManagerOptions()
-    : block_manager_type(FLAGS_block_manager),
-      read_only(false),
-      update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES) {
-}
-
-////////////////////////////////////////////////////////////
-// DataDirManager
-////////////////////////////////////////////////////////////
+DataDir::DataDir(Env* env, DirMetrics* metrics, FsType fs_type, std::string dir,
+                 std::unique_ptr<DirInstanceMetadataFile> metadata_file,
+                 std::unique_ptr<ThreadPool> pool)
+    : Dir(env, metrics, fs_type, std::move(dir), std::move(metadata_file), std::move(pool)) {}
 
-vector<string> DataDirManager::GetRootNames(const CanonicalizedRootsList& root_list) {
-  vector<string> roots;
-  std::transform(root_list.begin(), root_list.end(), std::back_inserter(roots),
-    [&] (const CanonicalizedRootAndStatus& r) { return r.path; });
-  return roots;
+std::unique_ptr<Dir> DataDirManager::CreateNewDir(
+    Env* env, DirMetrics* metrics, FsType fs_type,
+    std::string dir, std::unique_ptr<DirInstanceMetadataFile> metadata_file,
+    std::unique_ptr<ThreadPool> pool) {
+  return unique_ptr<Dir>(new DataDir(env, metrics, fs_type, std::move(dir),
+                                     std::move(metadata_file), std::move(pool)));
 }
 
-DataDirManager::DataDirManager(Env* env,
-                               DataDirManagerOptions opts,
-                               CanonicalizedRootsList canonicalized_data_roots)
-    : env_(env),
-      opts_(std::move(opts)),
-      canonicalized_data_fs_roots_(std::move(canonicalized_data_roots)),
-      rng_(GetRandomSeed32()) {
-  DCHECK_GT(canonicalized_data_fs_roots_.size(), 0);
-  DCHECK(opts_.update_instances == UpdateInstanceBehavior::DONT_UPDATE || !opts_.read_only);
-
-  if (opts_.metric_entity) {
-    metrics_.reset(new DataDirMetrics(opts_.metric_entity));
-  }
+int DataDir::available_space_cache_secs() const {
+  return FLAGS_fs_data_dirs_available_space_cache_seconds;
 }
 
-DataDirManager::~DataDirManager() {
-  Shutdown();
+int DataDir::reserved_bytes() const {
+  return FLAGS_fs_data_dirs_reserved_bytes;
 }
 
-void DataDirManager::WaitOnClosures() {
-  for (const auto& dd : data_dirs_) {
-    dd->WaitOnClosures();
-  }
-}
+////////////////////////////////////////////////////////////
+// DataDirManager
+////////////////////////////////////////////////////////////
 
-void DataDirManager::Shutdown() {
-  // We may be waiting here for a while on outstanding closures.
-  LOG_SLOW_EXECUTION(INFO, 1000,
-                     Substitute("waiting on $0 block manager thread pools",
-                                data_dirs_.size())) {
-    for (const auto& dd : data_dirs_) {
-      dd->Shutdown();
-    }
-  }
-}
+DataDirManagerOptions::DataDirManagerOptions()
+    : DirManagerOptions(FLAGS_block_manager) {}
 
-Status DataDirManager::OpenExistingForTests(Env* env, vector<string> data_fs_roots,
-                                            DataDirManagerOptions opts,
+DataDirManager::DataDirManager(Env* env,
+                               const DataDirManagerOptions& opts,
+                               CanonicalizedRootsList canonicalized_data_roots)
+    : DirManager(env, opts.metric_entity ?
+                          unique_ptr<DirMetrics>(new DataDirMetrics(opts.metric_entity)) : nullptr,
+                 FLAGS_fs_max_thread_count_per_data_dir,
+                 opts, std::move(canonicalized_data_roots)) {}
+
+Status DataDirManager::OpenExistingForTests(Env* env,
+                                            vector<string> data_fs_roots,
+                                            const DataDirManagerOptions& opts,
                                             unique_ptr<DataDirManager>* dd_manager) {
   CanonicalizedRootsList roots;
   for (const auto& r : data_fs_roots) {
     roots.push_back({ r, Status::OK() });
   }
-  return DataDirManager::OpenExisting(env, std::move(roots), std::move(opts), dd_manager);
+  return DataDirManager::OpenExisting(env, std::move(roots), opts, dd_manager);
 }
 
 Status DataDirManager::OpenExisting(Env* env, CanonicalizedRootsList data_fs_roots,
-                                    DataDirManagerOptions opts,
+                                    const DataDirManagerOptions& opts,
                                     unique_ptr<DataDirManager>* dd_manager) {
   unique_ptr<DataDirManager> dm;
-  dm.reset(new DataDirManager(env, std::move(opts), std::move(data_fs_roots)));
+  dm.reset(new DataDirManager(env, opts, std::move(data_fs_roots)));
   RETURN_NOT_OK(dm->Open());
   dd_manager->swap(dm);
   return Status::OK();
 }
 
 Status DataDirManager::CreateNewForTests(Env* env, vector<string> data_fs_roots,
-                                         DataDirManagerOptions opts,
+                                         const DataDirManagerOptions& opts,
                                          unique_ptr<DataDirManager>* dd_manager) {
   CanonicalizedRootsList roots;
   for (const auto& r : data_fs_roots) {
     roots.push_back({ r, Status::OK() });
   }
-  return DataDirManager::CreateNew(env, std::move(roots), std::move(opts), dd_manager);
+  return DataDirManager::CreateNew(env, std::move(roots), opts, dd_manager);
 }
 
 Status DataDirManager::CreateNew(Env* env, CanonicalizedRootsList data_fs_roots,
-                                 DataDirManagerOptions opts,
+                                 const DataDirManagerOptions& opts,
                                  unique_ptr<DataDirManager>* dd_manager) {
   unique_ptr<DataDirManager> dm;
-  dm.reset(new DataDirManager(env, std::move(opts), std::move(data_fs_roots)));
+  dm.reset(new DataDirManager(env, opts, std::move(data_fs_roots)));
   RETURN_NOT_OK(dm->Create());
   RETURN_NOT_OK(dm->Open());
   dd_manager->swap(dm);
   return Status::OK();
 }
 
-Status DataDirManager::Create() {
-  CHECK(!opts_.read_only);
-
-  vector<string> all_uuids;
-  for (const auto& r : canonicalized_data_fs_roots_) {
-    RETURN_NOT_OK_PREPEND(r.status, "Could not create directory manager with disks failed");
-  }
-  vector<unique_ptr<DirInstanceMetadataFile>> loaded_instances;
-  bool has_existing_instances;
-  RETURN_NOT_OK(LoadInstances(&loaded_instances, &has_existing_instances));
-  if (has_existing_instances) {
-    return Status::AlreadyPresent("instance files already exist");
-  }
-
-  // If none of the instances exist, we can assume this is a new deployment and
-  // we should try creating some a new set of instance files.
-  RETURN_NOT_OK_PREPEND(CreateNewDirectoriesAndUpdateInstances(std::move(loaded_instances)),
-                        "could not create new data directories");
-  return Status::OK();
-}
-
-Status DataDirManager::CreateNewDirectoriesAndUpdateInstances(
-    vector<unique_ptr<DirInstanceMetadataFile>> instances) {
-  CHECK(!opts_.read_only);
-  CHECK_NE(UpdateInstanceBehavior::DONT_UPDATE, opts_.update_instances);
-
-  vector<string> created_dirs;
-  vector<string> created_files;
-  auto deleter = MakeScopedCleanup([&]() {
-    // Delete files first so that the directories will be empty when deleted.
-    for (const auto& f : created_files) {
-      WARN_NOT_OK(env_->DeleteFile(f), "Could not delete file " + f);
-    }
-    // Delete directories in reverse order since parent directories will have
-    // been added before child directories.
-    for (auto it = created_dirs.rbegin(); it != created_dirs.rend(); it++) {
-      WARN_NOT_OK(env_->DeleteDir(*it), "Could not delete dir " + *it);
-    }
-  });
-
-  // First, de-duplicate the instance UUIDs. If we have duplicates, something's
-  // wrong. Maybe an operator manually duplicated some instance files.
-  set<string> all_uuids;
-  for (const auto& instance : instances) {
-    InsertIfNotPresent(&all_uuids, instance->uuid());
-  }
-  if (all_uuids.size() != instances.size()) {
-    return Status::InvalidArgument(
-        Substitute("instance files contain duplicate UUIDs: $0 directories provided, "
-                   "$1 unique UUIDs found ($2)", instances.size(), all_uuids.size(),
-                   JoinStrings(all_uuids, ", ")));
-  }
-
-  // Determine which instance files are healthy (and can thus be updated), and
-  // which don't exist. Create any that don't exist.
-  //
-  // Note: we don't bother trying to create/update the instance if the file is
-  // otherwise unhealthy.
-  vector<unique_ptr<DirInstanceMetadataFile>> healthy_instances;
-  for (auto& instance : instances) {
-    if (instance->healthy()) {
-      healthy_instances.emplace_back(std::move(instance));
-      continue;
-    }
-    if (instance->health_status().IsNotFound()) {
-      bool created_dir = false;
-      RETURN_NOT_OK(instance->Create(all_uuids, &created_dir));
-      if (created_dir) {
-        created_dirs.emplace_back(instance->dir());
-      }
-      created_files.emplace_back(instance->path());
-    }
-  }
-
-  // Go through the healthy instances and look for instances that don't have
-  // the full complete set of instance UUIDs.
-  vector<unique_ptr<DirInstanceMetadataFile>> instances_to_update;
-  for (auto& instance : healthy_instances) {
-    DCHECK(instance->healthy());
-    const auto& dir_set = instance->metadata()->dir_set();
-    set<string> instance_uuids;
-    for (int i = 0; i < dir_set.all_uuids_size(); i++) {
-      InsertIfNotPresent(&instance_uuids, dir_set.all_uuids(i));
-    }
-    // If an instance file disagrees with the expected UUIDs, rewrite it.
-    if (all_uuids != instance_uuids) {
-      instances_to_update.emplace_back(std::move(instance));
-    }
-  }
-
-  // If any of the instance files need to be updated because they didn't match
-  // the expected set of UUIDs, update them now.
-  // Note: Having a consistent set of instance files isn't a correctness
-  // requirement, but it can be useful for degbugging.
-  if (!instances_to_update.empty()) {
-    RETURN_NOT_OK(UpdateHealthyInstances(instances_to_update, all_uuids));
-  }
-
-  // Ensure newly created directories are synchronized to disk.
-  if (FLAGS_enable_data_block_fsync) {
-    WARN_NOT_OK(env_util::SyncAllParentDirs(env_, created_dirs, created_files),
-                "could not sync newly created data directories");
-  }
-
-  // Success: don't delete any files.
-  deleter.cancel();
-  return Status::OK();
-}
-
-Status DataDirManager::UpdateHealthyInstances(
-    const vector<unique_ptr<DirInstanceMetadataFile>>& instances_to_update,
-    const set<string>& new_all_uuids) {
-  unordered_map<string, string> copies_to_restore;
-  unordered_set<string> copies_to_delete;
-  auto cleanup = MakeScopedCleanup([&] {
-    for (const auto& f : copies_to_delete) {
-      WARN_NOT_OK(env_->DeleteFile(f), Substitute("Could not delete file $0", f));
-    }
-    for (const auto& copy_and_original : copies_to_restore) {
-      const auto& copy_filename = copy_and_original.first;
-      const auto& original_filename = copy_and_original.second;
-      WARN_NOT_OK(env_->RenameFile(copy_filename, original_filename),
-          Substitute("Could not restore file $0 from $1", original_filename, copy_filename));
-    }
-  });
-  // Make a copy of every existing instance metadata file. This is done before
-  // performing any updates, so that if there's a failure while copying,
-  // there's no metadata to restore.
-  //
-  // We'll keep track of the copies so we can delete them on success, or use
-  // them to restore on failure.
-  WritableFileOptions opts;
-  opts.sync_on_close = true;
-  for (const auto& instance : instances_to_update) {
-    if (!instance->healthy()) {
-      continue;
-    }
-    const string& instance_filename = instance->path();
-    string copy_filename = instance_filename + kTmpInfix;
-    Status s = env_util::CopyFile(env_, instance_filename, copy_filename, opts);
-    if (PREDICT_FALSE(!s.ok())) {
-      s = s.CloneAndPrepend("unable to backup existing instance file");
-      instance->SetInstanceFailed(s);
-      LOG(WARNING) << s.ToString();
-      continue;
-    }
-    InsertOrDie(&copies_to_delete, copy_filename);
-  }
-
-  // Update the instance metadata files with the new set of UUIDs.
-  for (const auto& instance : instances_to_update) {
-    if (!instance->healthy()) {
-      continue;
-    }
-    const string& instance_filename = instance->path();
-    string copy_filename = instance_filename + kTmpInfix;
-
-    // Put together the PB and perform the update.
-    DirInstanceMetadataPB new_pb = *instance->metadata();
-    new_pb.mutable_dir_set()->mutable_all_uuids()->Clear();
-    for (const auto& uuid : new_all_uuids) {
-      new_pb.mutable_dir_set()->add_all_uuids(uuid);
-    }
-
-    // We're about to update the file; if we fail midway, we should try to
-    // restore them from our backups if we can.
-    InsertOrDie(&copies_to_restore, copy_filename, instance_filename);
-    CHECK_EQ(1, copies_to_delete.erase(copy_filename));
-    Status s = pb_util::WritePBContainerToPath(
-        env_, instance_filename, new_pb, pb_util::OVERWRITE,
-        FLAGS_enable_data_block_fsync ? pb_util::SYNC : pb_util::NO_SYNC);
-    // We've failed to update for some reason, so restore our original file.
-    // Since we're renaming our copy, we don't have to delete it.
-    if (PREDICT_FALSE(!s.ok())) {
-      s = s.CloneAndPrepend("unable to update instance file");
-      instance->SetInstanceFailed(s);
-      LOG(WARNING) << Substitute("unable to overwrite existing instance file $0: $1",
-                                 instance_filename, s.ToString());
-    }
-  }
-
-  // If we are not tolerating errors (e.g. we're running the update_dirs tool)
-  // and we've hit an error, return now and clean up what we've changed.
-  if (opts_.update_instances == UpdateInstanceBehavior::UPDATE_AND_ERROR_ON_FAILURE) {
-    for (const auto& instance : instances_to_update) {
-      RETURN_NOT_OK_PREPEND(instance->health_status(),
-          "at least one instance file failed to update");
-    }
-  }
-
-  // Success; we only need to delete our copies.
-  InsertKeysFromMap(copies_to_restore, &copies_to_delete);
-  copies_to_restore.clear();
-  return Status::OK();
+bool DataDirManager::sync_dirs() const {
+  return FLAGS_enable_data_block_fsync;
 }
 
-Status DataDirManager::LoadInstances(
-    vector<unique_ptr<DirInstanceMetadataFile>>* instance_files,
-    bool* has_existing_instances) {
-  LockMode lock_mode;
-  if (!FLAGS_fs_lock_data_dirs) {
-    lock_mode = LockMode::NONE;
-  } else if (opts_.read_only) {
-    lock_mode = LockMode::OPTIONAL;
-  } else {
-    lock_mode = LockMode::MANDATORY;
-  }
-  vector<string> missing_roots_tmp;
-  vector<unique_ptr<DirInstanceMetadataFile>> loaded_instances;
-  ObjectIdGenerator gen;
-  for (int i = 0; i < canonicalized_data_fs_roots_.size(); i++) {
-    const auto& root = canonicalized_data_fs_roots_[i];
-    string data_dir = JoinPathSegments(root.path, kDataDirName);
-    string instance_filename = JoinPathSegments(data_dir, kInstanceMetadataFileName);
-
-    // Initialize the instance with a backup UUID. In case the load fails, this
-    // will be the UUID for our instnace.
-    string backup_uuid = gen.Next();
-    unique_ptr<DirInstanceMetadataFile> instance(
-        new DirInstanceMetadataFile(env_, std::move(backup_uuid), opts_.block_manager_type,
-                                     instance_filename));
-    if (PREDICT_FALSE(!root.status.ok())) {
-      instance->SetInstanceFailed(root.status);
-    } else {
-      // This may return OK and mark 'instance' as unhealthy if the file could
-      // not be loaded (e.g. not found, disk errors).
-      RETURN_NOT_OK_PREPEND(instance->LoadFromDisk(),
-                            Substitute("could not load $0", instance_filename));
-    }
-
-    // Try locking the instance.
-    if (instance->healthy() && lock_mode != LockMode::NONE) {
-      // This may return OK and mark 'instance' as unhealthy if the file could
-      // not be locked due to non-locking issues (e.g. disk errors).
-      Status s = instance->Lock();
-      if (!s.ok()) {
-        if (lock_mode == LockMode::OPTIONAL) {
-          LOG(WARNING) << s.ToString();
-          LOG(WARNING) << "Proceeding without lock";
-        } else {
-          DCHECK(LockMode::MANDATORY == lock_mode);
-          return s;
-        }
-      }
-    }
-    loaded_instances.emplace_back(std::move(instance));
-  }
-
-  int num_healthy_instances = 0;
-  for (const auto& instance : loaded_instances) {
-    if (instance->healthy()) {
-      num_healthy_instances++;
-    }
-  }
-  if (has_existing_instances) {
-    *has_existing_instances = num_healthy_instances > 0;
-  }
-  instance_files->swap(loaded_instances);
-  return Status::OK();
+bool DataDirManager::lock_dirs() const {
+  return FLAGS_fs_lock_data_dirs;
 }
 
-Status DataDirManager::PopulateDirectoryMaps(const vector<unique_ptr<DataDir>>& dds) {
-  // Helper lambda to add a directory to the maps.
-  const auto insert_to_maps = [&] (const string& uuid, int idx, DataDir* dd) {
-    if (!dd->instance()->healthy()) {
-      if (metrics_) {
-        metrics_->data_dirs_failed->IncrementBy(1);
-      }
-      InsertOrDie(&failed_data_dirs_, idx);
-    }
-    InsertOrDie(&uuid_by_root_, DirName(dd->dir()), uuid);
-    InsertOrDie(&uuid_by_idx_, idx, uuid);
-    InsertOrDie(&idx_by_uuid_, uuid, idx);
-    InsertOrDie(&data_dir_by_uuid_idx_, idx, dd);
-    InsertOrDie(&uuid_idx_by_data_dir_, dd, idx);
-    InsertOrDie(&tablets_by_uuid_idx_map_, idx, {});
-  };
-
-  if (opts_.block_manager_type == "file") {
-    // When assigning directories for the file block manager, the UUID indexes
-    // must match what exists in the instance files' list of UUIDs.
-    unordered_map<string, int> uuid_to_idx;
-    for (const auto& dd : dds) {
-      // Find a healthy instance file and use its set of UUIDs.
-      if (dd->instance()->healthy()) {
-        const auto& dir_set = dd->instance()->metadata()->dir_set();
-        VLOG(1) << Substitute("using dir set $0 as reference: $1",
-            dd->instance()->path(), pb_util::SecureDebugString(dir_set));
-        for (int idx = 0; idx < dir_set.all_uuids_size(); idx++) {
-          const string& uuid = dir_set.all_uuids(idx);
-          InsertIfNotPresent(&uuid_to_idx, uuid, idx);
-        }
-        break;
-      }
-    }
-    // We should have the same number of UUID assignments as directories.
-    if (dds.size() != uuid_to_idx.size()) {
-      return Status::Corruption(
-          Substitute("instance file is corrupted: $0 unique UUIDs expected, got $1",
-                     dds.size(), uuid_to_idx.size()));
-    }
-    // Keep track of any dirs that were not referenced in the dir set. These
-    // are presumably from instance files we failed to read. We'll assign them
-    // indexes of those that remain.
-    vector<DataDir*> unassigned_dirs;
-    for (const auto& dd : dds) {
-      const auto& uuid = dd->instance()->uuid();
-      int* idx = FindOrNull(uuid_to_idx, uuid);
-      if (idx) {
-        insert_to_maps(uuid, *idx, dd.get());
-        uuid_to_idx.erase(uuid);
-      } else {
-        LOG(WARNING) << Substitute("instance $0 has unknown UUID $1",
-                                   dd->instance()->path(), uuid);
-        unassigned_dirs.emplace_back(dd.get());
-      }
-    }
-    DCHECK_EQ(unassigned_dirs.size(), uuid_to_idx.size());
-    int unassigned_dir_idx = 0;
-    for (const auto& failed_uuid_and_idx : uuid_to_idx) {
-      insert_to_maps(failed_uuid_and_idx.first, failed_uuid_and_idx.second,
-                     unassigned_dirs[unassigned_dir_idx++]);
-    }
-  } else {
-    // Go through our instances and assign them each a UUID index.
-    for (int idx = 0; idx < dds.size(); idx++) {
-      DataDir* dd = dds[idx].get();
-      insert_to_maps(dd->instance()->uuid(), idx, dd);
-    }
-  }
-  return Status::OK();
+int DataDirManager::max_dirs() const {
+  return opts_.dir_type == "file" ? (1 << 16) - 1 : kint32max;
 }
 
-Status DataDirManager::Open() {
-  const int kMaxDataDirs = opts_.block_manager_type == "file" ? (1 << 16) - 1 : kint32max;
-  if (canonicalized_data_fs_roots_.size() > kMaxDataDirs) {
-    return Status::InvalidArgument(Substitute("too many directories provided $0, max is $1",
-                                              canonicalized_data_fs_roots_.size(), kMaxDataDirs));
-  }
-
-  vector<unique_ptr<DirInstanceMetadataFile>> loaded_instances;
-  // Load the instance files from disk.
-  bool has_existing_instances;
-  RETURN_NOT_OK_PREPEND(LoadInstances(&loaded_instances, &has_existing_instances),
-      "failed to load instance files");
-  if (!has_existing_instances) {
-    return Status::NotFound(
-        "could not open directory manager, no healthy data directories found");
+Status DataDirManager::PopulateDirectoryMaps(const vector<unique_ptr<Dir>>& dirs) {
+  if (opts_.dir_type == "log") {
+    return DirManager::PopulateDirectoryMaps(dirs);
   }
-  // Note: the file block manager should not be updated because its block
-  // indexing algorithm depends on a fixed set of directories.
-  if (!opts_.read_only && opts_.block_manager_type != "file" &&
-      opts_.update_instances != UpdateInstanceBehavior::DONT_UPDATE) {
-    RETURN_NOT_OK_PREPEND(
-        CreateNewDirectoriesAndUpdateInstances(
-            std::move(loaded_instances)),
-            "could not add new data directories");
-    RETURN_NOT_OK_PREPEND(LoadInstances(&loaded_instances, &has_existing_instances),
-                          "failed to load instance files after updating");
-    if (!has_existing_instances) {
-      return Status::IOError(
-          "could not open directory manager, no healthy data directories found");
-    }
-  }
-
-  // All instances are present and accounted for. Time to create the in-memory
-  // data directory structures.
-  vector<unique_ptr<DataDir>> dds;
-  for (int i = 0; i < loaded_instances.size(); i++) {
-    auto& instance = loaded_instances[i];
-    const string data_dir = instance->dir();
-
-    // Figure out what filesystem the data directory is on.
-    DataDirFsType fs_type = DataDirFsType::OTHER;
-    if (instance->healthy()) {
-      bool result = false;
-      Status fs_check = env_->IsOnExtFilesystem(data_dir, &result);
-      if (fs_check.ok()) {
-        if (result) {
-          fs_type = DataDirFsType::EXT;
-        } else {
-          fs_check = env_->IsOnXfsFilesystem(data_dir, &result);
-          if (fs_check.ok() && result) {
-            fs_type = DataDirFsType::XFS;
-          }
-        }
-      }
-      // If we hit a disk error, consider the directory failed.
-      if (PREDICT_FALSE(fs_check.IsDiskFailure())) {
-        instance->SetInstanceFailed(fs_check.CloneAndPrepend("failed to check FS type"));
-      } else {
-        RETURN_NOT_OK(fs_check);
+  DCHECK_EQ("file", opts_.dir_type);
+  // When assigning directories for the file block manager, the UUID indexes
+  // must match what exists in the instance files' list of UUIDs.
+  unordered_map<string, int> uuid_to_idx;
+  for (const auto& dd : dirs) {
+    // Find a healthy instance file and use its set of UUIDs.
+    if (dd->instance()->healthy()) {
+      const auto& dir_set = dd->instance()->metadata()->dir_set();
+      VLOG(1) << Substitute("using dir set $0 as reference: $1",
+          dd->instance()->path(), pb_util::SecureDebugString(dir_set));
+      for (int idx = 0; idx < dir_set.all_uuids_size(); idx++) {
+        const string& uuid = dir_set.all_uuids(idx);
+        InsertIfNotPresent(&uuid_to_idx, uuid, idx);
       }
+      break;
     }
-
-    // Create a per-dir thread pool.
-    unique_ptr<ThreadPool> pool;
-    RETURN_NOT_OK(ThreadPoolBuilder(Substitute("data dir $0", i))
-                  .set_max_threads(FLAGS_fs_max_thread_count_per_data_dir)
-                  .set_trace_metric_prefix("data dirs")
-                  .Build(&pool));
-    unique_ptr<DataDir> dd(new DataDir(
-        env_, metrics_.get(), fs_type, data_dir, std::move(instance),
-        std::move(pool)));
-    dds.emplace_back(std::move(dd));
   }
-
-  // Use the per-dir thread pools to delete temporary files in parallel.
-  for (const auto& dd : dds) {
-    if (dd->instance()->healthy()) {
-      dd->ExecClosure(Bind(&DeleteTmpFilesRecursively, env_, dd->dir()));
+  // We should have the same number of UUID assignments as directories.
+  if (dirs.size() != uuid_to_idx.size()) {
+    return Status::Corruption(
+        Substitute("instance file is corrupted: $0 unique UUIDs expected, got $1",
+                    dirs.size(), uuid_to_idx.size()));
+  }
+  // Keep track of any dirs that were not referenced in the dir set. These
+  // are presumably from instance files we failed to read. We'll assign them
+  // indexes of those that remain.
+  vector<Dir*> unassigned_dirs;
+  for (const auto& dd : dirs) {
+    const auto& uuid = dd->instance()->uuid();
+    int* idx = FindOrNull(uuid_to_idx, uuid);
+    if (idx) {
+      InsertToMaps(uuid, *idx, dd.get());
+      uuid_to_idx.erase(uuid);
+    } else {
+      LOG(WARNING) << Substitute("instance $0 has unknown UUID $1",
+                                  dd->instance()->path(), uuid);
+      unassigned_dirs.emplace_back(dd.get());
     }
   }
-  for (const auto& dd : dds) {
-    dd->WaitOnClosures();
-  }
-
-  RETURN_NOT_OK(PopulateDirectoryMaps(dds));
-  data_dirs_ = std::move(dds);
-
-  // From this point onwards, the in-memory maps are the source of truth about
-  // the state of each data dir.
-
-  // Initialize the 'fullness' status of the data directories.
-  for (const auto& dd : data_dirs_) {
-    int uuid_idx;
-    CHECK(FindUuidIndexByDataDir(dd.get(), &uuid_idx));
-    if (ContainsKey(failed_data_dirs_, uuid_idx)) {
-      continue;
-    }
-    Status refresh_status = dd->RefreshAvailableSpace(DataDir::RefreshMode::ALWAYS);
-    if (PREDICT_FALSE(!refresh_status.ok())) {
-      if (refresh_status.IsDiskFailure()) {
-        RETURN_NOT_OK(MarkDataDirFailed(uuid_idx, refresh_status.ToString()));
-        continue;
-      }
-      return refresh_status;
-    }
+  DCHECK_EQ(unassigned_dirs.size(), uuid_to_idx.size());
+  int unassigned_dir_idx = 0;
+  for (const auto& failed_uuid_and_idx : uuid_to_idx) {
+    InsertToMaps(failed_uuid_and_idx.first, failed_uuid_and_idx.second,
+                    unassigned_dirs[unassigned_dir_idx++]);
   }
   return Status::OK();
 }
@@ -866,20 +358,20 @@ Status DataDirManager::CreateDataDirGroup(const string& tablet_id,
   // Adjust the disk group size to fit within the total number of data dirs.
   int group_target_size;
   if (FLAGS_fs_target_data_dirs_per_tablet == 0) {
-    group_target_size = data_dirs_.size();
+    group_target_size = dirs_.size();
   } else {
     group_target_size = std::min(FLAGS_fs_target_data_dirs_per_tablet,
-                                 static_cast<int>(data_dirs_.size()));
+                                 static_cast<int>(dirs_.size()));
   }
   vector<int> group_indices;
   if (mode == DirDistributionMode::ACROSS_ALL_DIRS) {
     // If using all dirs, add all regardless of directory state.
-    AppendKeysFromMap(data_dir_by_uuid_idx_, &group_indices);
+    AppendKeysFromMap(dir_by_uuid_idx_, &group_indices);
   } else {
     // Randomly select directories, giving preference to those with fewer tablets.
-    if (PREDICT_FALSE(!failed_data_dirs_.empty())) {
+    if (PREDICT_FALSE(!failed_dirs_.empty())) {
       group_target_size = std::min(group_target_size,
-          static_cast<int>(data_dirs_.size()) - static_cast<int>(failed_data_dirs_.size()));
+          static_cast<int>(dirs_.size()) - static_cast<int>(failed_dirs_.size()));
 
       // A size of 0 would indicate no healthy disks, which should crash the server.
       DCHECK_GE(group_target_size, 0);
@@ -894,10 +386,10 @@ Status DataDirManager::CreateDataDirGroup(const string& tablet_id,
     if (PREDICT_FALSE(group_indices.size() < FLAGS_fs_target_data_dirs_per_tablet)) {
       string msg = Substitute("Could only allocate $0 dirs of requested $1 for tablet "
                               "$2. $3 dirs total", group_indices.size(),
-                              FLAGS_fs_target_data_dirs_per_tablet, tablet_id, data_dirs_.size());
+                              FLAGS_fs_target_data_dirs_per_tablet, tablet_id, dirs_.size());
       if (metrics_) {
         SubstituteAndAppend(&msg, ", $0 dirs full, $1 dirs failed",
-                            metrics_->data_dirs_full->value(), metrics_->data_dirs_failed->value());
+                            metrics_->dirs_full->value(), metrics_->dirs_failed->value());
       }
       LOG(INFO) << msg;
     }
@@ -909,7 +401,7 @@ Status DataDirManager::CreateDataDirGroup(const string& tablet_id,
   return Status::OK();
 }
 
-Status DataDirManager::GetDirForBlock(const CreateBlockOptions& opts, DataDir** dir,
+Status DataDirManager::GetDirForBlock(const CreateBlockOptions& opts, Dir** dir,
                                       int* new_target_group_size) const {
   shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
   vector<int> healthy_uuid_indices;
@@ -927,12 +419,12 @@ Status DataDirManager::GetDirForBlock(const CreateBlockOptions& opts, DataDir**
     // natural tablet_id, select a data dir from any of the directories.
     CHECK(IsGTest());
     vector<int> all_uuid_indices;
-    AppendKeysFromMap(data_dir_by_uuid_idx_, &all_uuid_indices);
+    AppendKeysFromMap(dir_by_uuid_idx_, &all_uuid_indices);
     group_for_tests = DataDirGroup(std::move(all_uuid_indices));
     group = &group_for_tests;
   }
   // Within a given directory group, filter out the ones that are failed.
-  if (PREDICT_TRUE(failed_data_dirs_.empty())) {
+  if (PREDICT_TRUE(failed_dirs_.empty())) {
     healthy_uuid_indices = group->uuid_indices();
   } else {
     RemoveUnhealthyDataDirsUnlocked(group->uuid_indices(), &healthy_uuid_indices);
@@ -942,10 +434,10 @@ Status DataDirManager::GetDirForBlock(const CreateBlockOptions& opts, DataDir**
     }
   }
   // Within a given directory group, filter out the ones that are full.
-  vector<DataDir*> candidate_dirs;
+  vector<Dir*> candidate_dirs;
   for (auto uuid_idx : healthy_uuid_indices) {
-    DataDir* candidate = FindOrDie(data_dir_by_uuid_idx_, uuid_idx);
-    Status s = candidate->RefreshAvailableSpace(DataDir::RefreshMode::EXPIRED_ONLY);
+    Dir* candidate = FindOrDie(dir_by_uuid_idx_, uuid_idx);
+    Status s = candidate->RefreshAvailableSpace(Dir::RefreshMode::EXPIRED_ONLY);
     WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", candidate->dir()));
     if (s.ok() && !candidate->is_full()) {
       candidate_dirs.emplace_back(candidate);
@@ -958,7 +450,7 @@ Status DataDirManager::GetDirForBlock(const CreateBlockOptions& opts, DataDir**
     size_t num_total = group->uuid_indices().size();
     size_t num_full = 0;
     for (const auto& idx : group->uuid_indices()) {
-      if (FindOrDie(data_dir_by_uuid_idx_, idx)->is_full()) {
+      if (FindOrDie(dir_by_uuid_idx_, idx)->is_full()) {
         num_full++;
       }
     }
@@ -1008,7 +500,7 @@ Status DataDirManager::GetDataDirGroupPB(const string& tablet_id,
   return Status::OK();
 }
 
-Status DataDirManager::GetDirAddIfNecessary(const CreateBlockOptions& opts, DataDir** dir) {
+Status DataDirManager::GetDirAddIfNecessary(const CreateBlockOptions& opts, Dir** dir) {
   int new_target_group_size = 0;
   Status s = GetDirForBlock(opts, dir, &new_target_group_size);
   if (PREDICT_TRUE(s.ok())) {
@@ -1034,17 +526,17 @@ Status DataDirManager::GetDirAddIfNecessary(const CreateBlockOptions& opts, Data
   // If we're already at the new target group size (e.g. because another
   // thread has added a directory), just return the newly added directory.
   if (new_target_group_size <= group.uuid_indices().size()) {
-    *dir = FindOrDie(data_dir_by_uuid_idx_, group.uuid_indices().back());
+    *dir = FindOrDie(dir_by_uuid_idx_, group.uuid_indices().back());
     return Status::OK();
   }
   vector<int> group_uuid_indices = group.uuid_indices();
   GetDirsForGroupUnlocked(new_target_group_size, &group_uuid_indices);
   if (PREDICT_FALSE(group_uuid_indices.size() < new_target_group_size)) {
     // If we couldn't add to the group, return an error.
-    int num_total = data_dirs_.size();
-    int num_failed = failed_data_dirs_.size();
+    int num_total = dirs_.size();
+    int num_failed = failed_dirs_.size();
     int num_full = 0;
-    for (const auto& dd : data_dirs_) {
+    for (const auto& dd : dirs_) {
       if (dd->is_full()) num_full++;
     }
     return Status::IOError(
@@ -1059,7 +551,7 @@ Status DataDirManager::GetDirAddIfNecessary(const CreateBlockOptions& opts, Data
   int new_uuid_idx = group_uuid_indices.back();
   InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, new_uuid_idx), tablet_id);
   CHECK(!EmplaceOrUpdate(&group_by_tablet_map_, tablet_id, DataDirGroup(group_uuid_indices)));
-  *dir = FindOrDie(data_dir_by_uuid_idx_, new_uuid_idx);
+  *dir = FindOrDie(dir_by_uuid_idx_, new_uuid_idx);
   LOG(INFO) << Substitute("Added $0 to $1's directory group: $2",
                           (*dir)->dir(), tablet_id, s.ToString());
   return Status::OK();
@@ -1070,15 +562,15 @@ void DataDirManager::GetDirsForGroupUnlocked(int target_size,
   DCHECK(dir_group_lock_.is_locked());
   vector<int> candidate_indices;
   unordered_set<int> existing_group_indices(group_indices->begin(), group_indices->end());
-  for (auto& e : data_dir_by_uuid_idx_) {
+  for (auto& e : dir_by_uuid_idx_) {
     int uuid_idx = e.first;
-    DCHECK_LT(uuid_idx, data_dirs_.size());
+    DCHECK_LT(uuid_idx, dirs_.size());
     if (ContainsKey(existing_group_indices, uuid_idx) ||
-        ContainsKey(failed_data_dirs_, uuid_idx)) {
+        ContainsKey(failed_dirs_, uuid_idx)) {
       continue;
     }
-    DataDir* dd = e.second;
-    Status s = dd->RefreshAvailableSpace(DataDir::RefreshMode::ALWAYS);
+    Dir* dd = e.second;
+    Status s = dd->RefreshAvailableSpace(Dir::RefreshMode::ALWAYS);
     WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", dd->dir()));
     if (s.ok() && !dd->is_full()) {
       // TODO(awong): If a disk is unhealthy at the time of group creation, the
@@ -1098,9 +590,9 @@ void DataDirManager::GetDirsForGroupUnlocked(int target_size,
       int selected_index = 0;
       if (tablets_in_first == tablets_in_second &&
           PREDICT_TRUE(FLAGS_fs_data_dirs_consider_available_space)) {
-        int64_t space_in_first = FindOrDie(data_dir_by_uuid_idx_,
+        int64_t space_in_first = FindOrDie(dir_by_uuid_idx_,
                                            candidate_indices[0])->available_bytes();
-        int64_t space_in_second = FindOrDie(data_dir_by_uuid_idx_,
+        int64_t space_in_second = FindOrDie(dir_by_uuid_idx_,
                                             candidate_indices[1])->available_bytes();
         selected_index = space_in_first > space_in_second ? 0 : 1;
       } else {
@@ -1112,41 +604,6 @@ void DataDirManager::GetDirsForGroupUnlocked(int target_size,
   }
 }
 
-DataDir* DataDirManager::FindDataDirByUuidIndex(int uuid_idx) const {
-  DCHECK_LT(uuid_idx, data_dirs_.size());
-  return FindPtrOrNull(data_dir_by_uuid_idx_, uuid_idx);
-}
-
-bool DataDirManager::FindUuidIndexByDataDir(DataDir* dir, int* uuid_idx) const {
-  return FindCopy(uuid_idx_by_data_dir_, dir, uuid_idx);
-}
-
-bool DataDirManager::FindUuidIndexByRoot(const string& root, int* uuid_idx) const {
-  string uuid;
-  if (FindUuidByRoot(root, &uuid)) {
-    return FindUuidIndexByUuid(uuid, uuid_idx);
-  }
-  return false;
-}
-
-bool DataDirManager::FindUuidIndexByUuid(const string& uuid, int* uuid_idx) const {
-  return FindCopy(idx_by_uuid_, uuid, uuid_idx);
-}
-
-bool DataDirManager::FindUuidByRoot(const string& root, string* uuid) const {
-  return FindCopy(uuid_by_root_, root, uuid);
-}
-
-set<string> DataDirManager::FindTabletsByDataDirUuidIdx(int uuid_idx) const {
-  DCHECK_LT(uuid_idx, data_dirs_.size());
-  shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
-  const set<string>* tablet_set_ptr = FindOrNull(tablets_by_uuid_idx_map_, uuid_idx);
-  if (tablet_set_ptr) {
-    return *tablet_set_ptr;
-  }
-  return {};
-}
-
 Status DataDirManager::FindDataDirsByTabletId(const string& tablet_id,
                                               vector<string>* data_dirs) const {
   CHECK(data_dirs);
@@ -1159,7 +616,7 @@ Status DataDirManager::FindDataDirsByTabletId(const string& tablet_id,
     if (!FindUuidIndexByUuid(uuid, &uuid_idx)) {
       return Status::NotFound("unable to find index for UUID", uuid);
     }
-    const auto* data_dir = FindDataDirByUuidIndex(uuid_idx);
+    const auto* data_dir = FindDirByUuidIndex(uuid_idx);
     if (!data_dir) {
       return Status::NotFound(
           Substitute("unable to find data dir for UUID $0 with index $1",
@@ -1172,71 +629,18 @@ Status DataDirManager::FindDataDirsByTabletId(const string& tablet_id,
   return Status::OK();
 }
 
-void DataDirManager::MarkDataDirFailedByUuid(const string& uuid) {
-  int uuid_idx;
-  CHECK(FindUuidIndexByUuid(uuid, &uuid_idx));
-  WARN_NOT_OK(MarkDataDirFailed(uuid_idx), "Failed to handle disk failure");
-}
-
-Status DataDirManager::MarkDataDirFailed(int uuid_idx, const string& error_message) {
-  DCHECK_LT(uuid_idx, data_dirs_.size());
-  std::lock_guard<percpu_rwlock> lock(dir_group_lock_);
-  DataDir* dd = FindDataDirByUuidIndex(uuid_idx);
-  DCHECK(dd);
-  if (InsertIfNotPresent(&failed_data_dirs_, uuid_idx)) {
-    if (failed_data_dirs_.size() == data_dirs_.size()) {
-      // TODO(awong): pass 'error_message' as a Status instead of an string so
-      // we can avoid returning this artificial status.
-      return Status::IOError(Substitute("All data dirs have failed: ", error_message));
-    }
-    if (metrics_) {
-      metrics_->data_dirs_failed->IncrementBy(1);
-    }
-    string error_prefix = "";
-    if (!error_message.empty()) {
-      error_prefix = Substitute("$0: ", error_message);
-    }
-    LOG(ERROR) << error_prefix << Substitute("Directory $0 marked as failed", dd->dir());
-  }
-  return Status::OK();
-}
-
-bool DataDirManager::IsDataDirFailed(int uuid_idx) const {
-  DCHECK_LT(uuid_idx, data_dirs_.size());
-  shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
-  return ContainsKey(failed_data_dirs_, uuid_idx);
-}
-
-bool DataDirManager::IsTabletInFailedDir(const string& tablet_id) const {
-  const set<int> failed_dirs = GetFailedDataDirs();
-  for (int failed_dir : failed_dirs) {
-    if (ContainsKey(FindTabletsByDataDirUuidIdx(failed_dir), tablet_id)) {
-      return true;
-    }
-  }
-  return false;
-}
-
 void DataDirManager::RemoveUnhealthyDataDirsUnlocked(const vector<int>& uuid_indices,
                                                      vector<int>* healthy_indices) const {
-  if (PREDICT_TRUE(failed_data_dirs_.empty())) {
+  if (PREDICT_TRUE(failed_dirs_.empty())) {
     return;
   }
   healthy_indices->clear();
   for (int uuid_idx : uuid_indices) {
-    if (!ContainsKey(failed_data_dirs_, uuid_idx)) {
+    if (!ContainsKey(failed_dirs_, uuid_idx)) {
       healthy_indices->emplace_back(uuid_idx);
     }
   }
 }
 
-vector<string> DataDirManager::GetDataRoots() const {
-  return GetRootNames(canonicalized_data_fs_roots_);
-}
-
-vector<string> DataDirManager::GetDataDirs() const {
-  return JoinPathSegmentsV(GetDataRoots(), kDataDirName);
-}
-
 } // namespace fs
 } // namespace kudu
diff --git a/src/kudu/fs/data_dirs.h b/src/kudu/fs/data_dirs.h
index 18f989d..c308372 100644
--- a/src/kudu/fs/data_dirs.h
+++ b/src/kudu/fs/data_dirs.h
@@ -17,53 +17,37 @@
 
 #pragma once
 
-#include <cstdint>
 #include <memory>
-#include <mutex>
-#include <set>
 #include <string>
 #include <unordered_map>
 #include <vector>
 
 #include <gtest/gtest_prod.h>
 
-#include "kudu/gutil/callback.h"
+#include "kudu/fs/dir_manager.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/util/locks.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/random.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
 class DataDirGroupPB;
 class Env;
+class MetricEntity;
 class ThreadPool;
 
-// We pass around the results of canonicalization to indicate to the
-// DataDirManager which, if any, failed to canonicalize.
-//
-// TODO(awong): move the canonicalization of directories into the
-// DataDirManager so we can avoid this extra plumbing.
-struct CanonicalizedRootAndStatus {
-  std::string path;
-  Status status;
-};
-typedef std::vector<CanonicalizedRootAndStatus> CanonicalizedRootsList;
-
 namespace fs {
 
-typedef std::unordered_map<int, std::string> UuidByUuidIndexMap;
-typedef std::unordered_map<std::string, int> UuidIndexByUuidMap;
-
-struct CreateBlockOptions;
 class DirInstanceMetadataFile;
+struct CreateBlockOptions;
 
 const char kInstanceMetadataFileName[] = "block_manager_instance";
 const char kDataDirName[] = "data";
 
+struct DataDirMetrics : public DirMetrics {
+  explicit DataDirMetrics(const scoped_refptr<MetricEntity>& metric_entity);
+};
+
 namespace internal {
 
 // A DataDirGroup is a group of directories used by an entity for block
@@ -102,157 +86,28 @@ class DataDirGroup {
 
 }  // namespace internal
 
-// Detected type of filesystem.
-enum class DataDirFsType {
-  // ext2, ext3, or ext4.
-  EXT,
-
-  // SGI xfs.
-  XFS,
-
-  // None of the above.
-  OTHER
-};
-
-// Defines the behavior when opening a directory manager that has an
-// inconsistent or incomplete set of instance files.
-enum UpdateInstanceBehavior {
-  // If the data directories don't match the on-disk dir sets, update the
-  // on-disk data to match if not in read-only mode.
-  UPDATE_AND_IGNORE_FAILURES,
-
-  // Like UPDATE_AND_IGNORE_FAILURES, but will return an error if any of the updates to the
-  // on-disk files fail.
-  UPDATE_AND_ERROR_ON_FAILURE,
-
-  // If the data directories don't match the on-disk dir sets, continue without
-  // updating the on-disk data.
-  DONT_UPDATE
-};
-
-struct DataDirMetrics {
-  explicit DataDirMetrics(const scoped_refptr<MetricEntity>& entity);
-
-  scoped_refptr<AtomicGauge<uint64_t>> data_dirs_failed;
-  scoped_refptr<AtomicGauge<uint64_t>> data_dirs_full;
-};
-
-// Representation of a data directory in use by the block manager.
-class DataDir {
+// Instantiation of a directory that uses the appropriate gflags.
+class DataDir : public Dir {
  public:
   DataDir(Env* env,
-          DataDirMetrics* metrics,
-          DataDirFsType fs_type,
+          DirMetrics* metrics,
+          FsType fs_type,
           std::string dir,
           std::unique_ptr<DirInstanceMetadataFile> metadata_file,
           std::unique_ptr<ThreadPool> pool);
-  ~DataDir();
-
-  // Shuts down this dir's thread pool, waiting for any closures submitted via
-  // ExecClosure() to finish first.
-  void Shutdown();
-
-  // Run a task on this dir's thread pool.
-  //
-  // Normally the task is performed asynchronously. However, if submission to
-  // the pool fails, it runs synchronously on the current thread.
-  void ExecClosure(const Closure& task);
-
-  // Waits for any outstanding closures submitted via ExecClosure() to finish.
-  void WaitOnClosures();
-
-  // Tests whether the data directory is full by comparing the free space of
-  // its underlying filesystem with a predefined "reserved" space value.
-  //
-  // If 'mode' is EXPIRED_ONLY, performs the test only if the dir was last
-  // determined to be full some time ago. If 'mode' is ALWAYS, the test is
-  // performed regardless.
-  //
-  // Only returns a bad Status in the event of a real error; fullness is
-  // reflected via is_full().
-  enum class RefreshMode {
-    EXPIRED_ONLY,
-    ALWAYS,
-  };
-  Status RefreshAvailableSpace(RefreshMode mode);
-
-  DataDirFsType fs_type() const { return fs_type_; }
 
-  const std::string& dir() const { return dir_; }
-
-  const DirInstanceMetadataFile* instance() const {
-    return metadata_file_.get();
-  }
-
-  bool is_full() const {
-    std::lock_guard<simple_spinlock> l(lock_);
-    return is_full_;
-  }
-
-  int64_t available_bytes() {
-    std::lock_guard<simple_spinlock> l(lock_);
-    return available_bytes_;
-  }
-
- private:
-  Env* env_;
-  DataDirMetrics* metrics_;
-  const DataDirFsType fs_type_;
-  const std::string dir_;
-  const std::unique_ptr<DirInstanceMetadataFile> metadata_file_;
-  const std::unique_ptr<ThreadPool> pool_;
-
-  bool is_shutdown_;
-
-  // Protects 'last_space_check_', 'is_full_' and available_bytes_.
-  mutable simple_spinlock lock_;
-  MonoTime last_space_check_;
-  bool is_full_;
-
-  // The available bytes of this dir, updated by RefreshAvailableSpace.
-  int64_t available_bytes_;
-
-  DISALLOW_COPY_AND_ASSIGN(DataDir);
+  int available_space_cache_secs() const override;
+  int reserved_bytes() const override;
 };
 
-// Directory manager creation options.
-struct DataDirManagerOptions {
+struct DataDirManagerOptions : public DirManagerOptions {
   DataDirManagerOptions();
-
-  // The block manager type the directory manager should support.
-  // Must be either "file" or "log".
-  //
-  // Defaults to the value of FLAGS_block_manager.
-  std::string block_manager_type;
-
-  // The entity under which all metrics should be grouped. If null, metrics
-  // will not be produced.
-  //
-  // Defaults to null.
-  scoped_refptr<MetricEntity> metric_entity;
-
-  // Whether the directory manager should only allow reading.
-  //
-  // Defaults to false.
-  bool read_only;
-
-  // Whether to update the on-disk instances when opening directories if
-  // inconsistencies are detected.
-  //
-  // Defaults to UPDATE_AND_IGNORE_FAILURES.
-  UpdateInstanceBehavior update_instances;
 };
 
 // Encapsulates knowledge of data directory management on behalf of block
 // managers.
-class DataDirManager {
+class DataDirManager : public DirManager {
  public:
-  enum class LockMode {
-    MANDATORY,
-    OPTIONAL,
-    NONE,
-  };
-
   enum class DirDistributionMode {
     ACROSS_ALL_DIRS,
     USE_FLAG_SPEC,
@@ -260,18 +115,20 @@ class DataDirManager {
 
   // Public static initializers for use in tests. When used, data_fs_roots is
   // expected to be the successfully canonicalized directories.
-  static Status CreateNewForTests(Env* env, std::vector<std::string> data_fs_roots,
-                                  DataDirManagerOptions opts,
+  static Status CreateNewForTests(Env* env,
+                                  std::vector<std::string> data_fs_roots,
+                                  const DataDirManagerOptions& opts,
                                   std::unique_ptr<DataDirManager>* dd_manager);
-  static Status OpenExistingForTests(Env* env, std::vector<std::string> data_fs_roots,
-                                     DataDirManagerOptions opts,
+  static Status OpenExistingForTests(Env* env,
+                                     std::vector<std::string> data_fs_roots,
+                                     const DataDirManagerOptions& opts,
                                      std::unique_ptr<DataDirManager>* dd_manager);
 
   // Constructs a directory manager and creates its necessary files on-disk.
   //
   // Returns an error if any of the directories already exist.
   static Status CreateNew(Env* env, CanonicalizedRootsList data_fs_roots,
-                          DataDirManagerOptions opts,
+                          const DataDirManagerOptions& opts,
                           std::unique_ptr<DataDirManager>* dd_manager);
 
   // Constructs a directory manager and indexes the files found on-disk.
@@ -279,29 +136,9 @@ class DataDirManager {
   // Returns an error if the number of on-disk directories found exceeds the
   // max allowed, or if locks need to be acquired and cannot be.
   static Status OpenExisting(Env* env, CanonicalizedRootsList data_fs_roots,
-                             DataDirManagerOptions opts,
+                             const DataDirManagerOptions& opts,
                              std::unique_ptr<DataDirManager>* dd_manager);
 
-  // Returns the root names from the input 'root_list'.
-  static std::vector<std::string> GetRootNames(const CanonicalizedRootsList& root_list);
-
-  ~DataDirManager();
-
-  // Shuts down all directories' thread pools.
-  void Shutdown();
-
-  // Waits on all directories' thread pools.
-  void WaitOnClosures();
-
-  // Returns a list of all data dirs.
-  const std::vector<std::unique_ptr<DataDir>>& data_dirs() const {
-    return data_dirs_;
-  }
-
-  // ==========================================================================
-  // Tablet Placement
-  // ==========================================================================
-
   // Deserializes a DataDirGroupPB and associates the resulting DataDirGroup
   // with a tablet_id.
   //
@@ -338,74 +175,20 @@ class DataDirManager {
   // Returns a dir for block placement in the data dir group specified in
   // 'opts'. If none exists, adds a new dir to the group and returns the dir,
   // and if none can be added, returns an error.
-  Status GetDirAddIfNecessary(const CreateBlockOptions& opts, DataDir** dir);
-
-  // Finds the set of tablet_ids in the data dir specified by 'uuid_idx' and
-  // returns a copy, returning an empty set if none are found.
-  std::set<std::string> FindTabletsByDataDirUuidIdx(int uuid_idx) const;
+  Status GetDirAddIfNecessary(const CreateBlockOptions& opts, Dir** dir);
 
   // Returns in 'data_dirs' a sorted list of the directory names for the data
   // dirs of the tablet specified by 'tablet_id'.
   Status FindDataDirsByTabletId(const std::string& tablet_id,
                                 std::vector<std::string>* data_dirs) const;
 
-  // ==========================================================================
-  // Directory Health
-  // ==========================================================================
-
-  // Adds 'uuid_idx' to the set of failed data directories. This directory will
-  // no longer be used. Logs an error message prefixed with 'error_message'
-  // describing what directories are affected.
-  //
-  // Returns an error if all directories have failed.
-  Status MarkDataDirFailed(int uuid_idx, const std::string& error_message = "");
-
-  // Fails the directory specified by 'uuid' and logs a warning if all
-  // directories have failed.
-  void MarkDataDirFailedByUuid(const std::string& uuid);
-
-  // Returns whether or not the 'uuid_idx' refers to a failed directory.
-  bool IsDataDirFailed(int uuid_idx) const;
-
-  // Returns whether the tablet's data is spread across a failed directory.
-  bool IsTabletInFailedDir(const std::string& tablet_id) const;
-
-  const std::set<int> GetFailedDataDirs() const {
-    shared_lock<rw_spinlock> group_lock(dir_group_lock_.get_lock());
-    return failed_data_dirs_;
-  }
-
-  // ==========================================================================
-  // Directory Paths
-  // ==========================================================================
-
-  // Return a list of the canonicalized root directory names.
-  std::vector<std::string> GetDataRoots() const;
-
-  // Return a list of the canonicalized data directory names.
-  std::vector<std::string> GetDataDirs() const;
-
-  // ==========================================================================
-  // Representation Conversion
-  // ==========================================================================
-
-  // Finds a data directory by uuid index, returning null if it can't be found.
-  //
-  // More information on uuid indexes and their relation to data directories
-  // can be found next to DirSetPB in fs.proto.
-  DataDir* FindDataDirByUuidIndex(int uuid_idx) const;
-
-  // Finds a uuid index by data directory, returning false if it can't be found.
-  bool FindUuidIndexByDataDir(DataDir* dir, int* uuid_idx) const;
-
-  // Finds a uuid index by root path, returning false if it can't be found.
-  bool FindUuidIndexByRoot(const std::string& root, int* uuid_idx) const;
-
-  // Finds a uuid index by UUID, returning false if it can't be found.
-  bool FindUuidIndexByUuid(const std::string& uuid, int* uuid_idx) const;
-
-  // Finds a UUID by canonicalized root name, returning false if it can't be found.
-  bool FindUuidByRoot(const std::string& root, std::string* uuid) const;
+  // Create a new data directory.
+  std::unique_ptr<Dir> CreateNewDir(Env* env,
+                                    DirMetrics* metrics,
+                                    FsType fs_type,
+                                    std::string dir,
+                                    std::unique_ptr<DirInstanceMetadataFile> metadata_file,
+                                    std::unique_ptr<ThreadPool> pool) override;
 
  private:
   FRIEND_TEST(DataDirsTest, TestCreateGroup);
@@ -414,81 +197,31 @@ class DataDirManager {
   FRIEND_TEST(DataDirsTest, TestLoadBalancingDistribution);
   FRIEND_TEST(DataDirsTest, TestFailedDirNotAddedToGroup);
 
-  // Constructs a directory manager.
-  DataDirManager(Env* env,
-                 DataDirManagerOptions opts,
-                 CanonicalizedRootsList canonicalized_data_roots);
+  // Populates the maps to index the given directories.
+  Status PopulateDirectoryMaps(const std::vector<std::unique_ptr<Dir>>& dirs) override;
 
-  // Initializes the data directories on disk. Returns an error if initialized
-  // directories already exist.
-  //
-  // Note: this doesn't initialize any in-memory state for the directory
-  // manager.
-  Status Create();
+  const char* dir_name() const override {
+    return kDataDirName;
+  }
 
-  // Opens existing instance files from disk and indexes the files found.
-  //
-  // Returns an error if the number of on-disk directories found exceeds the
-  // max allowed, if locks need to be acquired and cannot be, or if there are
-  // no healthy directories.
-  //
-  // If appropriate, this will create any missing directories and rewrite
-  // existing instance files to be consistent with each other.
-  Status Open();
+  const char* instance_metadata_filename() const override {
+    return kInstanceMetadataFileName;
+  }
 
-  // Populates the maps to index the given directories.
-  Status PopulateDirectoryMaps(const std::vector<std::unique_ptr<DataDir>>& dds);
+  bool sync_dirs() const override;
+  bool lock_dirs() const override;
+  int max_dirs() const override;
 
-  // Loads the instance files for each directory root.
-  //
-  // On success, 'instance_files' contains instance objects, including those
-  // that failed to load because they were missing or because of a disk
-  // error; they are still considered "loaded" and are labeled unhealthy
-  // internally. 'has_existing_instances' is set to true if any of the instance
-  // files are healthy.
-  //
-  // Returns an error if an instance file fails in an irreconcileable way (e.g.
-  // the file is locked).
-  Status LoadInstances(
-      std::vector<std::unique_ptr<DirInstanceMetadataFile>>* instance_files,
-      bool* has_existing_instances);
-
-  // Takes the set of instance files, does some basic verification on them,
-  // creates any that don't exist on disk, and updates any that have a
-  // different set of UUIDs stored than the expected set.
-  //
-  // Returns an error if there is a configuration error, e.g. if the existing
-  // instances believe there should be a different block size.
-  //
-  // If in UPDATE_AND_IGNORE_FAILURES mode, an error is not returned in the event of a disk
-  // error. Instead, it is up to the caller to reload the instance files and
-  // proceed if healthy enough.
-  //
-  // If in UPDATE_AND_ERROR_ON_FAILURE mode, a failure to update instances will
-  // surface as an error.
-  Status CreateNewDirectoriesAndUpdateInstances(
-      std::vector<std::unique_ptr<DirInstanceMetadataFile>> instances);
-
-  // Updates the on-disk instance files specified by 'instances_to_update'
-  // (presumably those whose 'all_uuids' field doesn't match 'new_all_uuids')
-  // using the contents of 'new_all_uuids', skipping any unhealthy instance
-  // files.
-  //
-  // If in UPDATE_AND_IGNORE_FAILURES mode, this is best effort. If any of the instance
-  // updates fail (e.g. due to a disk error) in this mode, this will log a
-  // warning about the failed updates and return OK.
-  //
-  // If in UPDATE_AND_ERROR_ON_FAILURE mode, any failure will immediately attempt
-  // to clean up any altered state and return with an error.
-  Status UpdateHealthyInstances(
-      const std::vector<std::unique_ptr<DirInstanceMetadataFile>>& instances_to_update,
-      const std::set<std::string>& new_all_uuids);
+  // Constructs a directory manager.
+  DataDirManager(Env* env,
+                 const DataDirManagerOptions& opts,
+                 CanonicalizedRootsList canonicalized_data_roots);
 
   // Returns a random directory in the data dir group specified in 'opts',
   // giving preference to those with more free space. If there is no room in
   // the group, returns an IOError with the ENOSPC posix code and returns the
   // new target size for the data dir group.
-  Status GetDirForBlock(const CreateBlockOptions& opts, DataDir** dir,
+  Status GetDirForBlock(const CreateBlockOptions& opts, Dir** dir,
                         int* new_target_group_size) const;
 
   // Repeatedly selects directories from those available to put into a new
@@ -513,53 +246,9 @@ class DataDirManager {
   void RemoveUnhealthyDataDirsUnlocked(const std::vector<int>& uuid_indices,
                                        std::vector<int>* healthy_indices) const;
 
-  // The environment to be used for all data directory operations.
-  Env* env_;
-
-  // The options that the DataDirManager was created with.
-  const DataDirManagerOptions opts_;
-
-  // The canonicalized roots provided to the constructor, taken verbatim.
-  //
-  // - The first data root is used as the metadata root.
-  // - Common roots in the collections have been deduplicated.
-  const CanonicalizedRootsList canonicalized_data_fs_roots_;
-
-  std::unique_ptr<DataDirMetrics> metrics_;
-
-  std::vector<std::unique_ptr<DataDir>> data_dirs_;
-
-  typedef std::unordered_map<std::string, std::string> UuidByRootMap;
-  UuidByRootMap uuid_by_root_;
-
-  typedef std::unordered_map<int, DataDir*> UuidIndexMap;
-  UuidIndexMap data_dir_by_uuid_idx_;
-
-  typedef std::unordered_map<DataDir*, int> ReverseUuidIndexMap;
-  ReverseUuidIndexMap uuid_idx_by_data_dir_;
-
   typedef std::unordered_map<std::string, internal::DataDirGroup> TabletDataDirGroupMap;
   TabletDataDirGroupMap group_by_tablet_map_;
 
-  typedef std::unordered_map<int, std::set<std::string>> TabletsByUuidIndexMap;
-  TabletsByUuidIndexMap tablets_by_uuid_idx_map_;
-
-  UuidByUuidIndexMap uuid_by_idx_;
-  UuidIndexByUuidMap idx_by_uuid_;
-
-  typedef std::set<int> FailedDataDirSet;
-  FailedDataDirSet failed_data_dirs_;
-
-  // Lock protecting access to the dir group maps and to failed_data_dirs_.
-  // A percpu_rwlock is used so threads attempting to read (e.g. to get the
-  // next data directory for a Flush()) do not block each other, while threads
-  // attempting to write (e.g. to create a new tablet, thereby creating a new
-  // data directory group) block all threads.
-  mutable percpu_rwlock dir_group_lock_;
-
-  // RNG used to select directories.
-  mutable ThreadSafeRandom rng_;
-
   DISALLOW_COPY_AND_ASSIGN(DataDirManager);
 };
 
diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc
new file mode 100644
index 0000000..ea420c2
--- /dev/null
+++ b/src/kudu/fs/dir_manager.cc
@@ -0,0 +1,693 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/fs/dir_manager.h"
+
+#include <errno.h>
+
+#include <algorithm>
+#include <iterator>
+#include <memory>
+#include <ostream>
+#include <set>
+#include <string>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/fs/dir_util.h"
+#include "kudu/fs/fs.pb.h"
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/oid_generator.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/threadpool.h"
+
+using std::set;
+using std::string;
+using std::unique_ptr;
+using std::unordered_map;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace {
+
+// Wrapper for env_util::DeleteTmpFilesRecursively that is suitable for parallel
+// execution on a data directory's thread pool (which requires the return value
+// be void).
+void DeleteTmpFilesRecursively(Env* env, const string& path) {
+  WARN_NOT_OK(env_util::DeleteTmpFilesRecursively(env, path),
+              "Error while deleting temp files");
+}
+
+} // anonymous namespace
+namespace fs {
+
+Dir::Dir(Env* env,
+         DirMetrics* metrics,
+         FsType fs_type,
+         string dir,
+         unique_ptr<DirInstanceMetadataFile> metadata_file,
+         unique_ptr<ThreadPool> pool)
+    : env_(env),
+      metrics_(metrics),
+      fs_type_(fs_type),
+      dir_(std::move(dir)),
+      metadata_file_(std::move(metadata_file)),
+      pool_(std::move(pool)),
+      is_shutdown_(false),
+      is_full_(false),
+      available_bytes_(0) {
+}
+
+Dir::~Dir() {
+  Shutdown();
+}
+
+void Dir::Shutdown() {
+  if (is_shutdown_) {
+    return;
+  }
+
+  WaitOnClosures();
+  pool_->Shutdown();
+  is_shutdown_ = true;
+}
+
+void Dir::ExecClosure(const Closure& task) {
+  Status s = pool_->SubmitClosure(task);
+  if (!s.ok()) {
+    WARN_NOT_OK(
+        s, "Could not submit task to thread pool, running it synchronously");
+    task.Run();
+  }
+}
+
+void Dir::WaitOnClosures() {
+  pool_->Wait();
+}
+
+Status Dir::RefreshAvailableSpace(RefreshMode mode) {
+  switch (mode) {
+    case RefreshMode::EXPIRED_ONLY: {
+      std::lock_guard<simple_spinlock> l(lock_);
+      DCHECK(last_space_check_.Initialized());
+      MonoTime expiry = last_space_check_ + MonoDelta::FromSeconds(
+          available_space_cache_secs());
+      if (MonoTime::Now() < expiry) {
+        break;
+      }
+      FALLTHROUGH_INTENDED; // Root was previously full, check again.
+    }
+    case RefreshMode::ALWAYS: {
+      int64_t available_bytes_new;
+      Status s = env_util::VerifySufficientDiskSpace(
+          env_, dir_, 0, reserved_bytes(), &available_bytes_new);
+      bool is_full_new;
+      if (PREDICT_FALSE(s.IsIOError() && s.posix_code() == ENOSPC)) {
+        LOG(WARNING) << Substitute(
+            "Insufficient disk space under path $0: will retry after $1 seconds: $2",
+            dir_, available_space_cache_secs(), s.ToString());
+        s = Status::OK();
+        is_full_new = true;
+      } else {
+        is_full_new = false;
+      }
+      RETURN_NOT_OK_PREPEND(s, "Could not refresh fullness"); // Catch other types of IOErrors, etc.
+      {
+        std::lock_guard<simple_spinlock> l(lock_);
+        if (metrics_ && is_full_ != is_full_new) {
+          metrics_->dirs_full->IncrementBy(is_full_new ? 1 : -1);
+        }
+        is_full_ = is_full_new;
+        last_space_check_ = MonoTime::Now();
+        available_bytes_ = available_bytes_new;
+      }
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unknown check mode";
+  }
+  return Status::OK();
+}
+
+DirManagerOptions::DirManagerOptions(const string& dir_type)
+    : dir_type(dir_type), read_only(false),
+      update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES) {}
+
+
+vector<string> DirManager::GetRootNames(const CanonicalizedRootsList& root_list) {
+  vector<string> roots;
+  std::transform(root_list.begin(), root_list.end(), std::back_inserter(roots),
+    [&] (const CanonicalizedRootAndStatus& r) { return r.path; });
+  return roots;
+}
+
+vector<string> DirManager::GetRoots() const {
+  return GetRootNames(canonicalized_fs_roots_);
+}
+
+vector<string> DirManager::GetDirs() const {
+  return JoinPathSegmentsV(GetRoots(), dir_name());
+}
+
+DirManager::DirManager(Env* env,
+                       unique_ptr<DirMetrics> dir_metrics,
+                       int num_threads_per_dir,
+                       const DirManagerOptions& opts,
+                       CanonicalizedRootsList canonicalized_data_roots)
+    : env_(env),
+      num_threads_per_dir_(num_threads_per_dir),
+      opts_(opts),
+      canonicalized_fs_roots_(std::move(canonicalized_data_roots)),
+      metrics_(std::move(dir_metrics)),
+      rng_(GetRandomSeed32()) {
+  DCHECK_GT(canonicalized_fs_roots_.size(), 0);
+  DCHECK(opts_.update_instances == UpdateInstanceBehavior::DONT_UPDATE || !opts_.read_only);
+  DCHECK(!opts_.dir_type.empty());
+}
+
+DirManager::~DirManager() {
+  Shutdown();
+}
+
+void DirManager::WaitOnClosures() {
+  for (const auto& dir : dirs_) {
+    dir->WaitOnClosures();
+  }
+}
+
+void DirManager::Shutdown() {
+  // We may be waiting here for a while on outstanding closures.
+  LOG_SLOW_EXECUTION(INFO, 1000,
+                     Substitute("waiting on $0 block manager thread pools",
+                                dirs_.size())) {
+    for (const auto& dir : dirs_) {
+      dir->Shutdown();
+    }
+  }
+}
+
+Status DirManager::Create() {
+  CHECK(!opts_.read_only);
+
+  vector<string> all_uuids;
+  for (const auto& r : canonicalized_fs_roots_) {
+    RETURN_NOT_OK_PREPEND(r.status, "Could not create directory manager with disks failed");
+  }
+  vector<unique_ptr<DirInstanceMetadataFile>> loaded_instances;
+  bool has_existing_instances;
+  RETURN_NOT_OK(LoadInstances(&loaded_instances, &has_existing_instances));
+  if (has_existing_instances) {
+    return Status::AlreadyPresent("instance files already exist");
+  }
+
+  // If none of the instances exist, we can assume this is a new deployment and
+  // we should try creating some a new set of instance files.
+  RETURN_NOT_OK_PREPEND(CreateNewDirectoriesAndUpdateInstances(std::move(loaded_instances)),
+                        "could not create new data directories");
+  return Status::OK();
+}
+
+Status DirManager::CreateNewDirectoriesAndUpdateInstances(
+    vector<unique_ptr<DirInstanceMetadataFile>> instances) {
+  CHECK(!opts_.read_only);
+  CHECK_NE(UpdateInstanceBehavior::DONT_UPDATE, opts_.update_instances);
+
+  vector<string> created_dirs;
+  vector<string> created_files;
+  auto deleter = MakeScopedCleanup([&]() {
+    // Delete files first so that the directories will be empty when deleted.
+    for (const auto& f : created_files) {
+      WARN_NOT_OK(env_->DeleteFile(f), "Could not delete file " + f);
+    }
+    // Delete directories in reverse order since parent directories will have
+    // been added before child directories.
+    for (auto it = created_dirs.rbegin(); it != created_dirs.rend(); it++) {
+      WARN_NOT_OK(env_->DeleteDir(*it), "Could not delete dir " + *it);
+    }
+  });
+
+  // First, de-duplicate the instance UUIDs. If we have duplicates, something's
+  // wrong. Maybe an operator manually duplicated some instance files.
+  set<string> all_uuids;
+  for (const auto& instance : instances) {
+    InsertIfNotPresent(&all_uuids, instance->uuid());
+  }
+  if (all_uuids.size() != instances.size()) {
+    return Status::InvalidArgument(
+        Substitute("instance files contain duplicate UUIDs: $0 directories provided, "
+                   "$1 unique UUIDs found ($2)", instances.size(), all_uuids.size(),
+                   JoinStrings(all_uuids, ", ")));
+  }
+
+  // Determine which instance files are healthy (and can thus be updated), and
+  // which don't exist. Create any that don't exist.
+  //
+  // Note: we don't bother trying to create/update the instance if the file is
+  // otherwise unhealthy.
+  vector<unique_ptr<DirInstanceMetadataFile>> healthy_instances;
+  for (auto& instance : instances) {
+    if (instance->healthy()) {
+      healthy_instances.emplace_back(std::move(instance));
+      continue;
+    }
+    if (instance->health_status().IsNotFound()) {
+      bool created_dir = false;
+      RETURN_NOT_OK(instance->Create(all_uuids, &created_dir));
+      if (created_dir) {
+        created_dirs.emplace_back(instance->dir());
+      }
+      created_files.emplace_back(instance->path());
+    }
+  }
+
+  // Go through the healthy instances and look for instances that don't have
+  // the full complete set of instance UUIDs.
+  vector<unique_ptr<DirInstanceMetadataFile>> instances_to_update;
+  for (auto& instance : healthy_instances) {
+    DCHECK(instance->healthy());
+    const auto& dir_set = instance->metadata()->dir_set();
+    set<string> instance_uuids;
+    for (int i = 0; i < dir_set.all_uuids_size(); i++) {
+      InsertIfNotPresent(&instance_uuids, dir_set.all_uuids(i));
+    }
+    // If an instance file disagrees with the expected UUIDs, rewrite it.
+    if (all_uuids != instance_uuids) {
+      instances_to_update.emplace_back(std::move(instance));
+    }
+  }
+
+  // If any of the instance files need to be updated because they didn't match
+  // the expected set of UUIDs, update them now.
+  // Note: Having a consistent set of instance files isn't a correctness
+  // requirement, but it can be useful for degbugging.
+  if (!instances_to_update.empty()) {
+    RETURN_NOT_OK(UpdateHealthyInstances(instances_to_update, all_uuids));
+  }
+
+  // Ensure newly created directories are synchronized to disk.
+  if (sync_dirs()) {
+    WARN_NOT_OK(env_util::SyncAllParentDirs(env_, created_dirs, created_files),
+                "could not sync newly created data directories");
+  }
+
+  // Success: don't delete any files.
+  deleter.cancel();
+  return Status::OK();
+}
+
+Status DirManager::UpdateHealthyInstances(
+    const vector<unique_ptr<DirInstanceMetadataFile>>& instances_to_update,
+    const set<string>& new_all_uuids) {
+  unordered_map<string, string> copies_to_restore;
+  unordered_set<string> copies_to_delete;
+  auto cleanup = MakeScopedCleanup([&] {
+    for (const auto& f : copies_to_delete) {
+      WARN_NOT_OK(env_->DeleteFile(f), Substitute("Could not delete file $0", f));
+    }
+    for (const auto& copy_and_original : copies_to_restore) {
+      const auto& copy_filename = copy_and_original.first;
+      const auto& original_filename = copy_and_original.second;
+      WARN_NOT_OK(env_->RenameFile(copy_filename, original_filename),
+          Substitute("Could not restore file $0 from $1", original_filename, copy_filename));
+    }
+  });
+  // Make a copy of every existing instance metadata file. This is done before
+  // performing any updates, so that if there's a failure while copying,
+  // there's no metadata to restore.
+  //
+  // We'll keep track of the copies so we can delete them on success, or use
+  // them to restore on failure.
+  WritableFileOptions opts;
+  opts.sync_on_close = true;
+  for (const auto& instance : instances_to_update) {
+    if (!instance->healthy()) {
+      continue;
+    }
+    const string& instance_filename = instance->path();
+    string copy_filename = instance_filename + kTmpInfix;
+    Status s = env_util::CopyFile(env_, instance_filename, copy_filename, opts);
+    if (PREDICT_FALSE(!s.ok())) {
+      s = s.CloneAndPrepend("unable to backup existing instance file");
+      instance->SetInstanceFailed(s);
+      LOG(WARNING) << s.ToString();
+      continue;
+    }
+    InsertOrDie(&copies_to_delete, copy_filename);
+  }
+
+  // Update the instance metadata files with the new set of UUIDs.
+  for (const auto& instance : instances_to_update) {
+    if (!instance->healthy()) {
+      continue;
+    }
+    const string& instance_filename = instance->path();
+    string copy_filename = instance_filename + kTmpInfix;
+
+    // Put together the PB and perform the update.
+    DirInstanceMetadataPB new_pb = *instance->metadata();
+    new_pb.mutable_dir_set()->mutable_all_uuids()->Clear();
+    for (const auto& uuid : new_all_uuids) {
+      new_pb.mutable_dir_set()->add_all_uuids(uuid);
+    }
+
+    // We're about to update the file; if we fail midway, we should try to
+    // restore them from our backups if we can.
+    InsertOrDie(&copies_to_restore, copy_filename, instance_filename);
+    CHECK_EQ(1, copies_to_delete.erase(copy_filename));
+    Status s = pb_util::WritePBContainerToPath(
+        env_, instance_filename, new_pb, pb_util::OVERWRITE,
+        sync_dirs() ? pb_util::SYNC : pb_util::NO_SYNC);
+    // We've failed to update for some reason, so restore our original file.
+    // Since we're renaming our copy, we don't have to delete it.
+    if (PREDICT_FALSE(!s.ok())) {
+      s = s.CloneAndPrepend("unable to update instance file");
+      instance->SetInstanceFailed(s);
+      LOG(WARNING) << Substitute("unable to overwrite existing instance file $0: $1",
+                                 instance_filename, s.ToString());
+    }
+  }
+
+  // If we are not tolerating errors (e.g. we're running the update_dirs tool)
+  // and we've hit an error, return now and clean up what we've changed.
+  if (opts_.update_instances == UpdateInstanceBehavior::UPDATE_AND_ERROR_ON_FAILURE) {
+    for (const auto& instance : instances_to_update) {
+      RETURN_NOT_OK_PREPEND(instance->health_status(),
+          "at least one instance file failed to update");
+    }
+  }
+
+  // Success; we only need to delete our copies.
+  InsertKeysFromMap(copies_to_restore, &copies_to_delete);
+  copies_to_restore.clear();
+  return Status::OK();
+}
+
+Status DirManager::LoadInstances(
+    vector<unique_ptr<DirInstanceMetadataFile>>* instance_files,
+    bool* has_existing_instances) {
+  LockMode lock_mode;
+  if (!lock_dirs()) {
+    lock_mode = LockMode::NONE;
+  } else if (opts_.read_only) {
+    lock_mode = LockMode::OPTIONAL;
+  } else {
+    lock_mode = LockMode::MANDATORY;
+  }
+  vector<string> missing_roots_tmp;
+  vector<unique_ptr<DirInstanceMetadataFile>> loaded_instances;
+  ObjectIdGenerator gen;
+  for (int i = 0; i < canonicalized_fs_roots_.size(); i++) {
+    const auto& root = canonicalized_fs_roots_[i];
+    string dir = JoinPathSegments(root.path, dir_name());
+    string instance_filename = JoinPathSegments(dir, instance_metadata_filename());
+
+    // Initialize the instance with a backup UUID. In case the load fails, this
+    // will be the UUID for our instnace.
+    string backup_uuid = gen.Next();
+    unique_ptr<DirInstanceMetadataFile> instance(
+        new DirInstanceMetadataFile(env_, std::move(backup_uuid), opts_.dir_type,
+                                    instance_filename));
+    if (PREDICT_FALSE(!root.status.ok())) {
+      instance->SetInstanceFailed(root.status);
+    } else {
+      // This may return OK and mark 'instance' as unhealthy if the file could
+      // not be loaded (e.g. not found, disk errors).
+      RETURN_NOT_OK_PREPEND(instance->LoadFromDisk(),
+                            Substitute("could not load $0", instance_filename));
+    }
+
+    // Try locking the instance.
+    if (instance->healthy() && lock_mode != LockMode::NONE) {
+      // This may return OK and mark 'instance' as unhealthy if the file could
+      // not be locked due to non-locking issues (e.g. disk errors).
+      Status s = instance->Lock();
+      if (!s.ok()) {
+        if (lock_mode == LockMode::OPTIONAL) {
+          LOG(WARNING) << s.ToString();
+          LOG(WARNING) << "Proceeding without lock";
+        } else {
+          DCHECK(LockMode::MANDATORY == lock_mode);
+          return s;
+        }
+      }
+    }
+    loaded_instances.emplace_back(std::move(instance));
+  }
+
+  int num_healthy_instances = 0;
+  for (const auto& instance : loaded_instances) {
+    if (instance->healthy()) {
+      num_healthy_instances++;
+    }
+  }
+  if (has_existing_instances) {
+    *has_existing_instances = num_healthy_instances > 0;
+  }
+  instance_files->swap(loaded_instances);
+  return Status::OK();
+}
+
+Status DirManager::PopulateDirectoryMaps(const vector<unique_ptr<Dir>>& dirs) {
+  // Go through our instances and assign them each a UUID index.
+  for (int idx = 0; idx < dirs.size(); idx++) {
+    Dir* dir = dirs[idx].get();
+    InsertToMaps(dir->instance()->uuid(), idx, dir);
+  }
+  return Status::OK();
+}
+
+void DirManager::InsertToMaps(const string& uuid, int idx, Dir* dir) {
+  if (!dir->instance()->healthy()) {
+    if (metrics_) {
+      metrics_->dirs_failed->IncrementBy(1);
+    }
+    InsertOrDie(&failed_dirs_, idx);
+  }
+  InsertOrDie(&uuid_by_root_, DirName(dir->dir()), uuid);
+  InsertOrDie(&uuid_by_idx_, idx, uuid);
+  InsertOrDie(&idx_by_uuid_, uuid, idx);
+  InsertOrDie(&dir_by_uuid_idx_, idx, dir);
+  InsertOrDie(&uuid_idx_by_dir_, dir, idx);
+  InsertOrDie(&tablets_by_uuid_idx_map_, idx, {});
+}
+
+Status DirManager::Open() {
+  if (canonicalized_fs_roots_.size() > max_dirs()) {
+    return Status::InvalidArgument(Substitute("too many directories provided $0, max is $1",
+                                              canonicalized_fs_roots_.size(), max_dirs()));
+  }
+
+  vector<unique_ptr<DirInstanceMetadataFile>> loaded_instances;
+  // Load the instance files from disk.
+  bool has_existing_instances;
+  RETURN_NOT_OK_PREPEND(LoadInstances(&loaded_instances, &has_existing_instances),
+      "failed to load instance files");
+  if (!has_existing_instances) {
+    return Status::NotFound(
+        "could not open directory manager, no healthy directories found");
+  }
+  // Note: the file block manager should not be updated because its block
+  // indexing algorithm depends on a fixed set of directories.
+  if (!opts_.read_only && opts_.dir_type != "file" &&
+      opts_.update_instances != UpdateInstanceBehavior::DONT_UPDATE) {
+    RETURN_NOT_OK_PREPEND(
+        CreateNewDirectoriesAndUpdateInstances(
+            std::move(loaded_instances)),
+            "could not add new directories");
+    RETURN_NOT_OK_PREPEND(LoadInstances(&loaded_instances, &has_existing_instances),
+                          "failed to load instance files after updating");
+    if (!has_existing_instances) {
+      return Status::IOError(
+          "could not open directory manager, no healthy directories found");
+    }
+  }
+
+  // All instances are present and accounted for. Time to create the in-memory
+  // directory structures.
+  vector<unique_ptr<Dir>> dirs;
+  for (int i = 0; i < loaded_instances.size(); i++) {
+    auto& instance = loaded_instances[i];
+    const string dir = instance->dir();
+
+    // Figure out what filesystem the directory is on.
+    FsType fs_type = FsType::OTHER;
+    if (instance->healthy()) {
+      bool result = false;
+      Status fs_check = env_->IsOnExtFilesystem(dir, &result);
+      if (fs_check.ok()) {
+        if (result) {
+          fs_type = FsType::EXT;
+        } else {
+          fs_check = env_->IsOnXfsFilesystem(dir, &result);
+          if (fs_check.ok() && result) {
+            fs_type = FsType::XFS;
+          }
+        }
+      }
+      // If we hit a disk error, consider the directory failed.
+      if (PREDICT_FALSE(fs_check.IsDiskFailure())) {
+        instance->SetInstanceFailed(fs_check.CloneAndPrepend("failed to check FS type"));
+      } else {
+        RETURN_NOT_OK(fs_check);
+      }
+    }
+
+    // Create a per-dir thread pool.
+    unique_ptr<ThreadPool> pool;
+    RETURN_NOT_OK(ThreadPoolBuilder(Substitute("dir $0", i))
+                  .set_max_threads(num_threads_per_dir_)
+                  .set_trace_metric_prefix("dirs")
+                  .Build(&pool));
+    unique_ptr<Dir> new_dir = CreateNewDir(env_, metrics_.get(), fs_type, dir, std::move(instance),
+                                           std::move(pool));
+    dirs.emplace_back(std::move(new_dir));
+  }
+
+  // Use the per-dir thread pools to delete temporary files in parallel.
+  for (const auto& dir : dirs) {
+    if (dir->instance()->healthy()) {
+      dir->ExecClosure(Bind(&DeleteTmpFilesRecursively, env_, dir->dir()));
+    }
+  }
+  for (const auto& dir : dirs) {
+    dir->WaitOnClosures();
+  }
+
+  RETURN_NOT_OK(PopulateDirectoryMaps(dirs));
+  dirs_ = std::move(dirs);
+
+  // From this point onwards, the in-memory maps are the source of truth about
+  // the state of each dir.
+
+  // Initialize the 'fullness' status of the directories.
+  for (const auto& dd : dirs_) {
+    int uuid_idx;
+    CHECK(FindUuidIndexByDir(dd.get(), &uuid_idx));
+    if (ContainsKey(failed_dirs_, uuid_idx)) {
+      continue;
+    }
+    Status refresh_status = dd->RefreshAvailableSpace(Dir::RefreshMode::ALWAYS);
+    if (PREDICT_FALSE(!refresh_status.ok())) {
+      if (refresh_status.IsDiskFailure()) {
+        RETURN_NOT_OK(MarkDirFailed(uuid_idx, refresh_status.ToString()));
+        continue;
+      }
+      return refresh_status;
+    }
+  }
+  return Status::OK();
+}
+
+Dir* DirManager::FindDirByUuidIndex(int uuid_idx) const {
+  DCHECK_LT(uuid_idx, dirs_.size());
+  return FindPtrOrNull(dir_by_uuid_idx_, uuid_idx);
+}
+
+bool DirManager::FindUuidIndexByDir(Dir* dir, int* uuid_idx) const {
+  return FindCopy(uuid_idx_by_dir_, dir, uuid_idx);
+}
+
+bool DirManager::FindUuidIndexByRoot(const string& root, int* uuid_idx) const {
+  string uuid;
+  if (FindUuidByRoot(root, &uuid)) {
+    return FindUuidIndexByUuid(uuid, uuid_idx);
+  }
+  return false;
+}
+
+bool DirManager::FindUuidIndexByUuid(const string& uuid, int* uuid_idx) const {
+  return FindCopy(idx_by_uuid_, uuid, uuid_idx);
+}
+
+bool DirManager::FindUuidByRoot(const string& root, string* uuid) const {
+  return FindCopy(uuid_by_root_, root, uuid);
+}
+
+set<string> DirManager::FindTabletsByDirUuidIdx(int uuid_idx) const {
+  DCHECK_LT(uuid_idx, dirs_.size());
+  shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
+  const set<string>* tablet_set_ptr = FindOrNull(tablets_by_uuid_idx_map_, uuid_idx);
+  if (tablet_set_ptr) {
+    return *tablet_set_ptr;
+  }
+  return {};
+}
+
+void DirManager::MarkDirFailedByUuid(const std::string& uuid) {
+  int uuid_idx;
+  CHECK(FindUuidIndexByUuid(uuid, &uuid_idx));
+  WARN_NOT_OK(MarkDirFailed(uuid_idx), "Failed to handle disk failure");
+}
+
+Status DirManager::MarkDirFailed(int uuid_idx, const string& error_message) {
+  DCHECK_LT(uuid_idx, dirs_.size());
+  std::lock_guard<percpu_rwlock> lock(dir_group_lock_);
+  Dir* dir = FindDirByUuidIndex(uuid_idx);
+  DCHECK(dir);
+  if (InsertIfNotPresent(&failed_dirs_, uuid_idx)) {
+    if (failed_dirs_.size() == dirs_.size()) {
+      // TODO(awong): pass 'error_message' as a Status instead of an string so
+      // we can avoid returning this artificial status.
+      return Status::IOError(Substitute("All dirs have failed: ", error_message));
+    }
+    if (metrics_) {
+      metrics_->dirs_failed->IncrementBy(1);
+    }
+    string error_prefix = "";
+    if (!error_message.empty()) {
+      error_prefix = Substitute("$0: ", error_message);
+    }
+    LOG(ERROR) << error_prefix << Substitute("Directory $0 marked as failed", dir->dir());
+  }
+  return Status::OK();
+}
+
+
+bool DirManager::IsDirFailed(int uuid_idx) const {
+  DCHECK_LT(uuid_idx, dirs_.size());
+  shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
+  return ContainsKey(failed_dirs_, uuid_idx);
+}
+
+bool DirManager::IsTabletInFailedDir(const string& tablet_id) const {
+  const set<int> failed_dirs = GetFailedDirs();
+  for (int failed_dir : failed_dirs) {
+    if (ContainsKey(FindTabletsByDirUuidIdx(failed_dir), tablet_id)) {
+      return true;
+    }
+  }
+  return false;
+}
+
+} // namespace fs
+} // namespace kudu
diff --git a/src/kudu/fs/dir_manager.h b/src/kudu/fs/dir_manager.h
new file mode 100644
index 0000000..864b828
--- /dev/null
+++ b/src/kudu/fs/dir_manager.h
@@ -0,0 +1,436 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <memory>
+#include <mutex>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Env;
+class ThreadPool;
+
+// We pass around the results of canonicalization to indicate to the
+// directory manager which, if any, failed to canonicalize.
+//
+// TODO(awong): move the canonicalization of directories into the
+// directory manager so we can avoid this extra plumbing.
+struct CanonicalizedRootAndStatus {
+  std::string path;
+  Status status;
+};
+typedef std::vector<CanonicalizedRootAndStatus> CanonicalizedRootsList;
+
+namespace fs {
+
+typedef std::unordered_map<int, std::string> UuidByUuidIndexMap;
+typedef std::unordered_map<std::string, int> UuidIndexByUuidMap;
+class DirInstanceMetadataFile;
+
+// Defines the behavior when opening a directory manager that has an
+// inconsistent or incomplete set of instance files.
+enum UpdateInstanceBehavior {
+  // If the directories don't match the on-disk dir sets, update the on-disk
+  // data to match if not in read-only mode.
+  UPDATE_AND_IGNORE_FAILURES,
+
+  // Like UPDATE_AND_IGNORE_FAILURES, but will return an error if any of the updates to the
+  // on-disk files fail.
+  UPDATE_AND_ERROR_ON_FAILURE,
+
+  // If the directories don't match the on-disk dir sets, continue without
+  // updating the on-disk data.
+  DONT_UPDATE
+};
+
+struct DirMetrics {
+  scoped_refptr<AtomicGauge<uint64_t>> dirs_failed;
+  scoped_refptr<AtomicGauge<uint64_t>> dirs_full;
+};
+
+// Detected type of filesystem.
+enum class FsType {
+  // ext2, ext3, or ext4.
+  EXT,
+
+  // SGI xfs.
+  XFS,
+
+  // None of the above.
+  OTHER
+};
+
+// Representation of a directory (e.g. a data directory).
+class Dir {
+ public:
+  Dir(Env* env,
+      DirMetrics* metrics,
+      FsType fs_type,
+      std::string dir,
+      std::unique_ptr<DirInstanceMetadataFile> metadata_file,
+      std::unique_ptr<ThreadPool> pool);
+  ~Dir();
+
+  // Shuts down this dir's thread pool, waiting for any closures submitted via
+  // ExecClosure() to finish first.
+  void Shutdown();
+
+  // Run a task on this dir's thread pool.
+  //
+  // Normally the task is performed asynchronously. However, if submission to
+  // the pool fails, it runs synchronously on the current thread.
+  void ExecClosure(const Closure& task);
+
+  // Waits for any outstanding closures submitted via ExecClosure() to finish.
+  void WaitOnClosures();
+
+  // Tests whether the directory is full by comparing the free space of its
+  // underlying filesystem with a predefined "reserved" space value.
+  //
+  // If 'mode' is EXPIRED_ONLY, performs the test only if the dir was last
+  // determined to be full some time ago. If 'mode' is ALWAYS, the test is
+  // performed regardless.
+  //
+  // Only returns a bad Status in the event of a real error; fullness is
+  // reflected via is_full().
+  enum class RefreshMode {
+    EXPIRED_ONLY,
+    ALWAYS,
+  };
+  Status RefreshAvailableSpace(RefreshMode mode);
+
+  FsType fs_type() const { return fs_type_; }
+
+  // Return the full path of this directory.
+  const std::string& dir() const { return dir_; }
+
+  const DirInstanceMetadataFile* instance() const {
+    return metadata_file_.get();
+  }
+
+  bool is_full() const {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return is_full_;
+  }
+
+  int64_t available_bytes() {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return available_bytes_;
+  }
+
+  // The amount of time to cache the amount of available space in this
+  // directory.
+  virtual int available_space_cache_secs() const = 0;
+
+  // The number of bytes to reserve in each directory for non-Kudu usage. A
+  // value of -1 means 1% of the disk space in a directory will be reserved.
+  virtual int reserved_bytes() const = 0;
+
+ private:
+  Env* env_;
+  DirMetrics* metrics_;
+  const FsType fs_type_;
+  const std::string dir_;
+  const std::unique_ptr<DirInstanceMetadataFile> metadata_file_;
+  const std::unique_ptr<ThreadPool> pool_;
+
+  bool is_shutdown_;
+
+  // Protects 'last_space_check_', 'is_full_' and available_bytes_.
+  mutable simple_spinlock lock_;
+  MonoTime last_space_check_;
+  bool is_full_;
+
+  // The available bytes of this dir, updated by RefreshAvailableSpace.
+  int64_t available_bytes_;
+
+  DISALLOW_COPY_AND_ASSIGN(Dir);
+};
+
+struct DirManagerOptions {
+ public:
+  // The type of directory this directory manager should support.
+  //
+  // Must not be empty.
+  std::string dir_type;
+
+  // The entity under which all metrics should be grouped. If null, metrics
+  // will not be produced.
+  //
+  // Defaults to null.
+  scoped_refptr<MetricEntity> metric_entity;
+
+  // Whether the directory manager should only allow reading.
+  //
+  // Defaults to false.
+  bool read_only;
+
+  // Whether to update the on-disk instances when opening directories if
+  // inconsistencies are detected.
+  //
+  // Defaults to UPDATE_AND_IGNORE_FAILURES.
+  UpdateInstanceBehavior update_instances;
+
+ protected:
+  explicit DirManagerOptions(const std::string& dir_type);
+};
+
+class DirManager {
+ public:
+  enum class LockMode {
+    MANDATORY,
+    OPTIONAL,
+    NONE,
+  };
+
+  // Returns the root names from the input 'root_list'.
+  static std::vector<std::string> GetRootNames(const CanonicalizedRootsList& root_list);
+
+  ~DirManager();
+
+  // Shuts down all directories' thread pools.
+  void Shutdown();
+
+  // Waits on all directories' thread pools.
+  void WaitOnClosures();
+
+  // Returns a list of all dirs.
+  const std::vector<std::unique_ptr<Dir>>& dirs() const {
+    return dirs_;
+  }
+
+  // Adds 'uuid_idx' to the set of failed directories. This directory will no
+  // longer be used. Logs an error message prefixed with 'error_message'
+  // describing what directories are affected.
+  //
+  // Returns an error if all directories have failed.
+  Status MarkDirFailed(int uuid_idx, const std::string& error_message = "");
+
+  // Fails the directory specified by 'uuid' and logs a warning if all
+  // directories have failed.
+  void MarkDirFailedByUuid(const std::string& uuid);
+
+  // Returns whether or not the 'uuid_idx' refers to a failed directory.
+  bool IsDirFailed(int uuid_idx) const;
+
+  // Returns whether the given tablet exists in a failed directory.
+  bool IsTabletInFailedDir(const std::string& tablet_id) const;
+
+  std::set<int> GetFailedDirs() const {
+    shared_lock<rw_spinlock> group_lock(dir_group_lock_.get_lock());
+    return failed_dirs_;
+  }
+
+  // Return a list of the canonicalized root directory names.
+  std::vector<std::string> GetRoots() const;
+
+  // Return a list of the canonicalized directory names.
+  std::vector<std::string> GetDirs() const;
+
+  // Finds a directory by uuid index, returning null if it can't be found.
+  //
+  // More information on uuid indexes and their relation to directories
+  // can be found next to DirSetPB in fs.proto.
+  Dir* FindDirByUuidIndex(int uuid_idx) const;
+
+  // Finds a uuid index by directory, returning false if it can't be found.
+  bool FindUuidIndexByDir(Dir* dir, int* uuid_idx) const;
+
+  // Finds a uuid index by root path, returning false if it can't be found.
+  bool FindUuidIndexByRoot(const std::string& root, int* uuid_idx) const;
+
+  // Finds a uuid index by UUID, returning false if it can't be found.
+  bool FindUuidIndexByUuid(const std::string& uuid, int* uuid_idx) const;
+
+  // Finds a UUID by canonicalized root name, returning false if it can't be found.
+  bool FindUuidByRoot(const std::string& root, std::string* uuid) const;
+
+  // Finds the set of tablet IDs that are registered to use the directory with
+  // the given UUID index.
+  std::set<std::string> FindTabletsByDirUuidIdx(int uuid_idx) const;
+
+  // Create a new directory using the appropriate directory implementation.
+  virtual std::unique_ptr<Dir> CreateNewDir(Env* env,
+                                            DirMetrics* metrics,
+                                            FsType fs_type,
+                                            std::string dir,
+                                            std::unique_ptr<DirInstanceMetadataFile>,
+                                            std::unique_ptr<ThreadPool> pool) = 0;
+
+ protected:
+  // The name to be used by this directory manager for each sub-directory of
+  // each directory root.
+  virtual const char* dir_name() const = 0;
+
+  // The name to be used by this directory manager for each instance file
+  // corresponding to this directory manager.
+  virtual const char* instance_metadata_filename() const = 0;
+
+  // Whether to sync the directories when updating this manager's directories.
+  virtual bool sync_dirs() const = 0;
+
+  // Whether to lock the directories to prevent concurrent usage. Note:
+  // read-only concurrent usage is still allowed.
+  virtual bool lock_dirs() const = 0;
+
+  // The max number of directories to be managed.
+  virtual int max_dirs() const = 0;
+
+  DirManager(Env* env,
+             std::unique_ptr<DirMetrics> dir_metrics,
+             int num_threads_per_dir,
+             const DirManagerOptions& opts,
+             CanonicalizedRootsList canonicalized_data_roots);
+
+  // Initializes the data directories on disk. Returns an error if initialized
+  // directories already exist.
+  //
+  // Note: this doesn't initialize any in-memory state for the directory
+  // manager.
+  virtual Status Create();
+
+  // Opens existing instance files from disk and indexes the files found.
+  //
+  // Returns an error if the number of on-disk directories found exceeds the
+  // max allowed, if locks need to be acquired and cannot be, or if there are
+  // no healthy directories.
+  //
+  // If appropriate, this will create any missing directories and rewrite
+  // existing instance files to be consistent with each other.
+  virtual Status Open();
+
+  // Populates the maps to index the given directories.
+  virtual Status PopulateDirectoryMaps(const std::vector<std::unique_ptr<Dir>>& dirs);
+
+  // Helper function to add a directory to the internal maps. Assumes that the
+  // UUID, UUID index, and directory name have not already been inserted.
+  void InsertToMaps(const std::string& uuid, int idx, Dir* dir);
+
+  // Loads the instance files for each directory root.
+  //
+  // On success, 'instance_files' contains instance objects, including those
+  // that failed to load because they were missing or because of a disk
+  // error; they are still considered "loaded" and are labeled unhealthy
+  // internally. 'has_existing_instances' is set to true if any of the instance
+  // files are healthy.
+  //
+  // Returns an error if an instance file fails in an irreconcileable way (e.g.
+  // the file is locked).
+  Status LoadInstances(
+      std::vector<std::unique_ptr<DirInstanceMetadataFile>>* instance_files,
+      bool* has_existing_instances);
+
+  // Takes the set of instance files, does some basic verification on them,
+  // creates any that don't exist on disk, and updates any that have a
+  // different set of UUIDs stored than the expected set.
+  //
+  // Returns an error if there is a configuration error, e.g. if the existing
+  // instances believe there should be a different block size.
+  //
+  // If in UPDATE_AND_IGNORE_FAILURES mode, an error is not returned in the event of a disk
+  // error. Instead, it is up to the caller to reload the instance files and
+  // proceed if healthy enough.
+  //
+  // If in UPDATE_AND_ERROR_ON_FAILURE mode, a failure to update instances will
+  // surface as an error.
+  Status CreateNewDirectoriesAndUpdateInstances(
+      std::vector<std::unique_ptr<DirInstanceMetadataFile>> instances);
+
+  // Updates the on-disk instance files specified by 'instances_to_update'
+  // (presumably those whose 'all_uuids' field doesn't match 'new_all_uuids')
+  // using the contents of 'new_all_uuids', skipping any unhealthy instance
+  // files.
+  //
+  // If in UPDATE_AND_IGNORE_FAILURES mode, this is best effort. If any of the instance
+  // updates fail (e.g. due to a disk error) in this mode, this will log a
+  // warning about the failed updates and return OK.
+  //
+  // If in UPDATE_AND_ERROR_ON_FAILURE mode, any failure will immediately attempt
+  // to clean up any altered state and return with an error.
+  Status UpdateHealthyInstances(
+      const std::vector<std::unique_ptr<DirInstanceMetadataFile>>& instances_to_update,
+      const std::set<std::string>& new_all_uuids);
+
+  // The environment to be used for all directory operations.
+  Env* env_;
+
+  // The number of threads to allocate per directory threadpool.
+  const int num_threads_per_dir_;
+
+  // The options that the Dirmanager was created with.
+  const DirManagerOptions opts_;
+
+  // The canonicalized roots provided to the constructor, taken verbatim.
+  // Common roots in the collections have been deduplicated.
+  const CanonicalizedRootsList canonicalized_fs_roots_;
+
+  // Directories tracked by this manager.
+  std::vector<std::unique_ptr<Dir>> dirs_;
+
+  // Set of metrics relating to the health of the directories that this manager
+  // is tracking.
+  std::unique_ptr<DirMetrics> metrics_;
+
+  // Lock protecting access to the directory group maps and to failed_dirs_. A
+  // percpu_rwlock is used so threads attempting to read (e.g. to get the next
+  // directory for an operation) do not block each other, while threads
+  // attempting to write (e.g. to create a new tablet, thereby registering
+  // directories per tablet) block all threads.
+  mutable percpu_rwlock dir_group_lock_;
+
+  // RNG used to select directories.
+  mutable ThreadSafeRandom rng_;
+
+  typedef std::unordered_map<std::string, std::string> UuidByRootMap;
+  UuidByRootMap uuid_by_root_;
+
+  typedef std::unordered_map<int, Dir*> UuidIndexMap;
+  UuidIndexMap dir_by_uuid_idx_;
+
+  typedef std::unordered_map<Dir*, int> ReverseUuidIndexMap;
+  ReverseUuidIndexMap uuid_idx_by_dir_;
+
+  typedef std::unordered_map<int, std::set<std::string>> TabletsByUuidIndexMap;
+  TabletsByUuidIndexMap tablets_by_uuid_idx_map_;
+
+  UuidByUuidIndexMap uuid_by_idx_;
+  UuidIndexByUuidMap idx_by_uuid_;
+
+  typedef std::set<int> FailedDirSet;
+  FailedDirSet failed_dirs_;
+
+  DISALLOW_COPY_AND_ASSIGN(DirManager);
+};
+
+} // namespace fs
+} // namespace kudu
diff --git a/src/kudu/fs/error_manager.h b/src/kudu/fs/error_manager.h
index 046d734..f3f08f5 100644
--- a/src/kudu/fs/error_manager.h
+++ b/src/kudu/fs/error_manager.h
@@ -147,7 +147,7 @@ class FsErrorManager {
   void RunErrorNotificationCb(ErrorHandlerType e, const std::string& uuid) const;
 
   // Runs the error notification callback with the UUID of 'dir'.
-  void RunErrorNotificationCb(ErrorHandlerType e, const DataDir* dir) const {
+  void RunErrorNotificationCb(ErrorHandlerType e, const Dir* dir) const {
     DCHECK_EQ(e, ErrorHandlerType::DISK_ERROR);
     RunErrorNotificationCb(e, dir->instance()->uuid());
   }
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 9ec4471..cef55c1 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -33,6 +33,7 @@
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager_metrics.h"
 #include "kudu/fs/data_dirs.h"
+#include "kudu/fs/dir_manager.h"
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/fs_report.h"
 #include "kudu/gutil/bind.h"
@@ -98,16 +99,16 @@ class FileBlockLocation {
   }
 
   // Construct a location from its constituent parts.
-  static FileBlockLocation FromParts(DataDir* data_dir,
+  static FileBlockLocation FromParts(Dir* data_dir,
                                      int data_dir_idx,
                                      const BlockId& block_id);
 
   // Construct a location from a full block ID.
-  static FileBlockLocation FromBlockId(DataDir* data_dir,
+  static FileBlockLocation FromBlockId(Dir* data_dir,
                                        const BlockId& block_id);
 
   // Get the data dir index of a given block ID.
-  static int GetDataDirIdx(const BlockId& block_id) {
+  static int GetDirIdx(const BlockId& block_id) {
     return block_id.id() >> 48;
   }
 
@@ -128,11 +129,11 @@ class FileBlockLocation {
   void GetAllParentDirs(vector<string>* parent_dirs) const;
 
   // Simple accessors.
-  DataDir* data_dir() const { return data_dir_; }
+  Dir* data_dir() const { return data_dir_; }
   const BlockId& block_id() const { return block_id_; }
 
  private:
-  FileBlockLocation(DataDir* data_dir, BlockId block_id)
+  FileBlockLocation(Dir* data_dir, BlockId block_id)
       : data_dir_(data_dir), block_id_(block_id) {}
 
   // These per-byte accessors yield subdirectories in which blocks are grouped.
@@ -149,11 +150,11 @@ class FileBlockLocation {
                         (block_id_.id() & 0x00000000FF000000ULL) >> 24);
   }
 
-  DataDir* data_dir_;
+  Dir* data_dir_;
   BlockId block_id_;
 };
 
-FileBlockLocation FileBlockLocation::FromParts(DataDir* data_dir,
+FileBlockLocation FileBlockLocation::FromParts(Dir* data_dir,
                                                int data_dir_idx,
                                                const BlockId& block_id) {
   DCHECK_LT(data_dir_idx, kuint16max);
@@ -165,7 +166,7 @@ FileBlockLocation FileBlockLocation::FromParts(DataDir* data_dir,
   return FileBlockLocation(data_dir, BlockId(combined_id));
 }
 
-FileBlockLocation FileBlockLocation::FromBlockId(DataDir* data_dir,
+FileBlockLocation FileBlockLocation::FromBlockId(Dir* data_dir,
                                                  const BlockId& block_id) {
   return FileBlockLocation(data_dir, block_id);
 }
@@ -341,7 +342,7 @@ Status FileWritableBlock::AppendV(ArrayView<const Slice> data) {
   DCHECK(state_ == CLEAN || state_ == DIRTY) << "Invalid state: " << state_;
   RETURN_NOT_OK_HANDLE_ERROR(writer_->AppendV(data));
   RETURN_NOT_OK_HANDLE_ERROR(location_.data_dir()->RefreshAvailableSpace(
-      DataDir::RefreshMode::ALWAYS));
+      Dir::RefreshMode::ALWAYS));
   state_ = DIRTY;
 
   // Calculate the amount of data written
@@ -471,8 +472,8 @@ class FileReadableBlock : public ReadableBlock {
 };
 
 void FileReadableBlock::HandleError(const Status& s) const {
-  const DataDir* dir = block_manager_->dd_manager_->FindDataDirByUuidIndex(
-      internal::FileBlockLocation::GetDataDirIdx(block_id_));
+  const Dir* dir = block_manager_->dd_manager_->FindDirByUuidIndex(
+      internal::FileBlockLocation::GetDirIdx(block_id_));
   HANDLE_DISK_FAILURE(s, block_manager_->error_manager()->RunErrorNotificationCb(
       ErrorHandlerType::DISK_ERROR, dir));
 }
@@ -682,8 +683,8 @@ Status FileBlockManager::SyncMetadata(const internal::FileBlockLocation& locatio
 
 bool FileBlockManager::FindBlockPath(const BlockId& block_id,
                                      string* path) const {
-  DataDir* dir = dd_manager_->FindDataDirByUuidIndex(
-      internal::FileBlockLocation::GetDataDirIdx(block_id));
+  Dir* dir = dd_manager_->FindDirByUuidIndex(
+      internal::FileBlockLocation::GetDirIdx(block_id));
   if (dir) {
     *path = internal::FileBlockLocation::FromBlockId(
         dir, block_id).GetFullPath();
@@ -719,15 +720,15 @@ Status FileBlockManager::Open(FsReport* report) {
 
   // Prepare the filesystem report and either return or log it.
   FsReport local_report;
-  set<int> failed_dirs = dd_manager_->GetFailedDataDirs();
-  for (const auto& dd : dd_manager_->data_dirs()) {
+  set<int> failed_dirs = dd_manager_->GetFailedDirs();
+  for (const auto& dd : dd_manager_->dirs()) {
     // Don't report failed directories.
     // TODO(KUDU-2111): currently the FsReport only reports on containers for
     // the log block manager. Implement some sort of reporting for failed
     // directories as well.
     if (PREDICT_FALSE(!failed_dirs.empty())) {
       int uuid_idx;
-      CHECK(dd_manager_->FindUuidIndexByDataDir(dd.get(), &uuid_idx));
+      CHECK(dd_manager_->FindUuidIndexByDir(dd.get(), &uuid_idx));
       if (ContainsKey(failed_dirs, uuid_idx)) {
         continue;
       }
@@ -747,11 +748,11 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
                                      unique_ptr<WritableBlock>* block) {
   CHECK(!opts_.read_only);
 
-  DataDir* dir;
+  Dir* dir;
   RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir),
       error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, opts.tablet_id));
   int uuid_idx;
-  CHECK(dd_manager_->FindUuidIndexByDataDir(dir, &uuid_idx));
+  CHECK(dd_manager_->FindUuidIndexByDir(dir, &uuid_idx));
 
   string path;
   vector<string> created_dirs;
@@ -814,8 +815,8 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
 #define RETURN_NOT_OK_FBM_DISK_FAILURE(status_expr) do { \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
       error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, \
-      dd_manager_->FindDataDirByUuidIndex( \
-      internal::FileBlockLocation::GetDataDirIdx(block_id)))); \
+      dd_manager_->FindDirByUuidIndex( \
+      internal::FileBlockLocation::GetDirIdx(block_id)))); \
 } while (0)
 
 Status FileBlockManager::OpenBlock(const BlockId& block_id,
@@ -838,9 +839,9 @@ Status FileBlockManager::DeleteBlock(const BlockId& block_id) {
   CHECK(!opts_.read_only);
 
   // Return early if deleting a block in a failed directory.
-  set<int> failed_dirs = dd_manager_->GetFailedDataDirs();
+  set<int> failed_dirs = dd_manager_->GetFailedDirs();
   if (PREDICT_FALSE(!failed_dirs.empty())) {
-    int uuid_idx = internal::FileBlockLocation::GetDataDirIdx(block_id);
+    int uuid_idx = internal::FileBlockLocation::GetDirIdx(block_id);
     if (ContainsKey(failed_dirs, uuid_idx)) {
       LOG_EVERY_N(INFO, 10) << Substitute("Block $0 is in a failed directory; not deleting",
                                           block_id.ToString());
@@ -880,7 +881,7 @@ shared_ptr<BlockDeletionTransaction> FileBlockManager::NewDeletionTransaction()
 
 namespace {
 
-Status GetAllBlockIdsForDataDirCb(DataDir* dd,
+Status GetAllBlockIdsForDataDirCb(Dir* dd,
                                   vector<BlockId>* block_ids,
                                   Env::FileType file_type,
                                   const string& dirname,
@@ -910,8 +911,8 @@ Status GetAllBlockIdsForDataDirCb(DataDir* dd,
   return Status::OK();
 }
 
-void GetAllBlockIdsForDataDir(Env* env,
-                              DataDir* dd,
+void GetAllBlockIdsForDir(Env* env,
+                              Dir* dd,
                               vector<BlockId>* block_ids,
                               Status* status) {
   *status = env->Walk(dd->dir(), Env::PRE_ORDER,
@@ -921,7 +922,7 @@ void GetAllBlockIdsForDataDir(Env* env,
 } // anonymous namespace
 
 Status FileBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) {
-  const auto& dds = dd_manager_->data_dirs();
+  const auto& dds = dd_manager_->dirs();
   block_ids->clear();
 
   // The FBM does not maintain block listings in memory, so off we go to the
@@ -929,13 +930,13 @@ Status FileBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) {
   vector<vector<BlockId>> block_id_vecs(dds.size());
   vector<Status> statuses(dds.size());
   for (int i = 0; i < dds.size(); i++) {
-    dds[i]->ExecClosure(Bind(&GetAllBlockIdsForDataDir,
+    dds[i]->ExecClosure(Bind(&GetAllBlockIdsForDir,
                              env_,
                              dds[i].get(),
                              &block_id_vecs[i],
                              &statuses[i]));
   }
-  for (const auto& dd : dd_manager_->data_dirs()) {
+  for (const auto& dd : dd_manager_->dirs()) {
     dd->WaitOnClosures();
   }
 
diff --git a/src/kudu/fs/fs_manager-test.cc b/src/kudu/fs/fs_manager-test.cc
index 0a9fc13..03c2bc1 100644
--- a/src/kudu/fs/fs_manager-test.cc
+++ b/src/kudu/fs/fs_manager-test.cc
@@ -39,6 +39,7 @@
 
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/data_dirs.h"
+#include "kudu/fs/dir_manager.h"
 #include "kudu/fs/dir_util.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_report.h"
@@ -444,7 +445,7 @@ TEST_F(FsManagerTestBase, TestOpenWithDuplicateInstanceFiles) {
   const string duplicate_dir_instance = JoinPathSegments(
       duplicate_test_dir, kInstanceMetadataFileName);
   ASSERT_OK(env_util::CopyFile(env_,
-        fs_manager()->dd_manager()->FindDataDirByUuidIndex(0)->instance()->path(),
+        fs_manager()->dd_manager()->FindDirByUuidIndex(0)->instance()->path(),
         duplicate_dir_instance, wr_opts));
 
   // This is disallowed, as each directory should have its own unique UUID.
@@ -475,7 +476,7 @@ TEST_F(FsManagerTestBase, TestOpenWithNoBlockManagerInstances) {
     new_opts.update_instances = check_behavior;
     ReinitFsManagerWithOpts(new_opts);
     Status s = fs_manager()->Open();
-    ASSERT_STR_CONTAINS(s.ToString(), "no healthy data directories found");
+    ASSERT_STR_CONTAINS(s.ToString(), "no healthy directories found");
     ASSERT_TRUE(s.IsNotFound());
 
     // Once we supply the WAL directory as a data directory, we can open successfully.
@@ -529,7 +530,7 @@ TEST_F(FsManagerTestBase, TestOpenWithUnhealthyDataDir) {
   }
 
   ASSERT_OK(s);
-  ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+  ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDirs().size());
 
   // Now remove the new directory from disk. Kudu should start up with the
   // empty disk and attempt to use it. Upon opening the FS layout, we should
@@ -538,7 +539,7 @@ TEST_F(FsManagerTestBase, TestOpenWithUnhealthyDataDir) {
   ASSERT_OK(env_->DeleteRecursively(new_root));
   ReinitFsManagerWithOpts(opts);
   ASSERT_OK(fs_manager()->Open());
-  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
 
   // Even at the same mountpoint, the directory will be assigned a new UUID.
   string new_root_uuid_post_update;
@@ -572,7 +573,7 @@ TEST_F(FsManagerTestBase, TestOpenWithUnhealthyDataDir) {
 
   // ...except we should be able to successfully create a new FS layout.
   ASSERT_OK(fs_manager()->CreateInitialFileSystemLayout());
-  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
 }
 
 // When we canonicalize a directory, we actually canonicalize the directory's
@@ -598,7 +599,7 @@ TEST_F(FsManagerTestBase, TestOpenWithCanonicalizationFailure) {
   FLAGS_env_inject_eio_globs = JoinPathSegments(dir2, "**");
   FLAGS_env_inject_eio = 1.0;
   ASSERT_OK(fs_manager()->Open());
-  ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+  ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDirs().size());
   FLAGS_env_inject_eio = 0;
 
   // Now fail the canonicalization by deleting a parent directory. This
@@ -614,13 +615,13 @@ TEST_F(FsManagerTestBase, TestOpenWithCanonicalizationFailure) {
     return;
   }
   ASSERT_OK(s);
-  ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+  ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDirs().size());
 
   // Let's try that again, but with the appropriate mountpoint/directory.
   ASSERT_OK(env_->CreateDir(dir2));
   ReinitFsManagerWithOpts(opts);
   ASSERT_OK(fs_manager()->Open());
-  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
 }
 
 TEST_F(FsManagerTestBase, TestTmpFilesCleanup) {
@@ -763,24 +764,24 @@ TEST_F(FsManagerTestBase, TestAddRemoveDataDirs) {
   opts.data_roots = { fs_root_, new_path1 };
   ReinitFsManagerWithOpts(opts);
   ASSERT_OK(fs_manager()->Open());
-  ASSERT_EQ(2, fs_manager()->dd_manager()->GetDataDirs().size());
-  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+  ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
+  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
 
   // Try to open with a data dir removed; this should succeed, and Kudu should
   // open with only a single data directory.
   opts.data_roots = { fs_root_ };
   ReinitFsManagerWithOpts(opts);
   ASSERT_OK(fs_manager()->Open());
-  ASSERT_EQ(1, fs_manager()->dd_manager()->GetDataDirs().size());
-  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+  ASSERT_EQ(1, fs_manager()->dd_manager()->GetDirs().size());
+  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
 
   // We should be able to add new directories anywhere in the list.
   const string new_path2 = GetTestPath("new_path2");
   opts.data_roots = { new_path2, fs_root_ };
   ReinitFsManagerWithOpts(opts);
   ASSERT_OK(fs_manager()->Open());
-  ASSERT_EQ(2, fs_manager()->dd_manager()->GetDataDirs().size());
-  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+  ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
+  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
 
   // Open the FS layout with an existing, failed data dir; this should be fine,
   // but should report a single failed directory.
@@ -788,7 +789,7 @@ TEST_F(FsManagerTestBase, TestAddRemoveDataDirs) {
   FLAGS_env_inject_eio_globs = JoinPathSegments(new_path2, "**");
   ReinitFsManagerWithOpts(opts);
   ASSERT_OK(fs_manager()->Open());
-  ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+  ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDirs().size());
 }
 
 TEST_F(FsManagerTestBase, TestEIOWhileChangingDirs) {
@@ -934,7 +935,7 @@ TEST_F(FsManagerTestBase, TestReAddRemovedDataDir) {
     ReinitFsManagerWithOpts(opts);
     ASSERT_OK(fs_manager()->Open());
     DataDirManager* dd_manager = fs_manager()->dd_manager();
-    ASSERT_EQ(data_roots.size(), dd_manager->GetDataDirs().size());
+    ASSERT_EQ(data_roots.size(), dd_manager->GetDirs().size());
 
     // Since we haven't deleted any directories or instance files, ensure that
     // our UUIDs match across startups.
@@ -1003,22 +1004,22 @@ TEST_F(FsManagerTestBase, TestAddRemoveSpeculative) {
   opts.update_instances = UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES;
   ReinitFsManagerWithOpts(opts);
   ASSERT_OK(fs_manager()->Open());
-  ASSERT_EQ(2, fs_manager()->dd_manager()->GetDataDirs().size());
+  ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
 
   // Create a 'speculative' FsManager with the second data directory removed.
   opts.data_roots = { fs_root_ };
   opts.update_instances = UpdateInstanceBehavior::DONT_UPDATE;
   ReinitFsManagerWithOpts(opts);
   ASSERT_OK(fs_manager()->Open());
-  ASSERT_EQ(1, fs_manager()->dd_manager()->GetDataDirs().size());
+  ASSERT_EQ(1, fs_manager()->dd_manager()->GetDirs().size());
 
   // Do the same thing, but with a new data directory added.
   const string new_path2 = GetTestPath("new_path2");
   opts.data_roots = { fs_root_, new_path1, new_path2 };
   ReinitFsManagerWithOpts(opts);
   ASSERT_OK(fs_manager()->Open());
-  ASSERT_EQ(3, fs_manager()->dd_manager()->GetDataDirs().size());
-  ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+  ASSERT_EQ(3, fs_manager()->dd_manager()->GetDirs().size());
+  ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDirs().size());
 
   // Neither of those attempts should have changed the on-disk state. Verify
   // this by retrying all three combinations again.
@@ -1031,7 +1032,7 @@ TEST_F(FsManagerTestBase, TestAddRemoveSpeculative) {
     ReinitFsManagerWithOpts(opts);
     ASSERT_OK(fs_manager()->Open());
     ASSERT_EQ(data_roots.size() == 3 ? 1 : 0,
-              fs_manager()->dd_manager()->GetFailedDataDirs().size());
+              fs_manager()->dd_manager()->GetFailedDirs().size());
   }
 
   // When we allow ourselves to update the disk instances, each open will
@@ -1041,14 +1042,14 @@ TEST_F(FsManagerTestBase, TestAddRemoveSpeculative) {
     opts.data_roots = data_roots;
     ReinitFsManagerWithOpts(opts);
     ASSERT_OK(fs_manager()->Open());
-    ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+    ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
 
     // Since the on-disk state has been updated, we should be able to open the
     // speculative directory with no issues.
     opts.update_instances = UpdateInstanceBehavior::DONT_UPDATE;
     ReinitFsManagerWithOpts(opts);
     ASSERT_OK(fs_manager()->Open());
-    ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDataDirs().size());
+    ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
   }
 }
 
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 356ceda..8c53010 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -408,12 +408,12 @@ Status FsManager::Open(FsReport* report) {
   if (!dd_manager_) {
     DataDirManagerOptions dm_opts;
     dm_opts.metric_entity = opts_.metric_entity;
-    dm_opts.block_manager_type = opts_.block_manager_type;
     dm_opts.read_only = opts_.read_only;
+    dm_opts.dir_type = opts_.block_manager_type;
     dm_opts.update_instances = opts_.update_instances;
     LOG_TIMING(INFO, "opening directory manager") {
       RETURN_NOT_OK(DataDirManager::OpenExisting(env_,
-          canonicalized_data_fs_roots_, std::move(dm_opts), &dd_manager_));
+          canonicalized_data_fs_roots_, dm_opts, &dd_manager_));
     }
   }
 
@@ -427,7 +427,7 @@ Status FsManager::Open(FsReport* report) {
 
   // Set an initial error handler to mark data directories as failed.
   error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
-      Bind(&DataDirManager::MarkDataDirFailedByUuid, Unretained(dd_manager_.get())));
+      Bind(&DataDirManager::MarkDirFailedByUuid, Unretained(dd_manager_.get())));
 
   // Finally, initialize and open the block manager.
   InitBlockManager();
@@ -517,7 +517,7 @@ Status FsManager::CreateInitialFileSystemLayout(boost::optional<string> uuid) {
   dm_opts.read_only = opts_.read_only;
   LOG_TIMING(INFO, "creating directory manager") {
     RETURN_NOT_OK_PREPEND(DataDirManager::CreateNew(
-        env_, canonicalized_data_fs_roots_, std::move(dm_opts), &dd_manager_),
+        env_, canonicalized_data_fs_roots_, dm_opts, &dd_manager_),
                           "Unable to create directory manager");
   }
 
@@ -625,7 +625,7 @@ const string& FsManager::uuid() const {
 
 vector<string> FsManager::GetDataRootDirs() const {
   // Get the data subdirectory for each data root.
-  return dd_manager_->GetDataDirs();
+  return dd_manager_->GetDirs();
 }
 
 string FsManager::GetTabletMetadataDir() const {
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index 6c61580..7406be0 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -29,7 +29,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest_prod.h>
 
-#include "kudu/fs/data_dirs.h"
+#include "kudu/fs/dir_manager.h"
 #include "kudu/fs/error_manager.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
@@ -50,6 +50,7 @@ class MemTracker;
 namespace fs {
 
 class BlockManager;
+class DataDirManager;
 class FsManagerTestBase_TestDuplicatePaths_Test;
 class FsManagerTestBase_TestEIOWhileRunningUpdateDirsTool_Test;
 class FsManagerTestBase_TestIsolatedMetadataDir_Test;
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index b1eec55..7166516 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -22,6 +22,7 @@
 #include <cstdlib>
 #include <cstring>
 #include <deque>
+#include <initializer_list>
 #include <memory>
 #include <ostream>
 #include <set>
@@ -33,13 +34,13 @@
 
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/data_dirs.h"
+#include "kudu/fs/dir_manager.h"
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_report.h"
@@ -236,7 +237,7 @@ class LogBlockManagerTest : public KuduTest {
     // Populate 'data_files' and 'metadata_files'.
     vector<string> data_files;
     vector<string> metadata_files;
-    for (const string& data_dir : dd_manager_->GetDataDirs()) {
+    for (const string& data_dir : dd_manager_->GetDirs()) {
       vector<string> children;
       ASSERT_OK(env_->GetChildren(data_dir, &children));
       for (const string& child : children) {
@@ -425,7 +426,7 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
           {0, &METRIC_log_block_manager_dead_containers_deleted} }));
   }
   // Wait for the actual hole punching to take place.
-  for (const auto& data_dir : dd_manager_->data_dirs()) {
+  for (const auto& data_dir : dd_manager_->dirs()) {
     data_dir->WaitOnClosures();
   }
   NO_FATALS(CheckLogMetrics(new_entity,
@@ -472,7 +473,7 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
           {0, &METRIC_log_block_manager_dead_containers_deleted} }));
   }
   // Wait for the actual hole punching to take place.
-  for (const auto& data_dir : dd_manager_->data_dirs()) {
+  for (const auto& data_dir : dd_manager_->dirs()) {
     data_dir->WaitOnClosures();
   }
   NO_FATALS(CheckLogMetrics(new_entity,
@@ -1167,7 +1168,7 @@ TEST_F(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
   NO_FATALS(GetOnlyContainer(&container_name));
 
   // Add a mixture of regular and misaligned blocks to it.
-  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   int num_misaligned_blocks = 0;
   for (int i = 0; i < kNumBlocks; i++) {
@@ -1285,7 +1286,7 @@ TEST_F(LogBlockManagerTest, TestRepairPreallocateExcessSpace) {
   NO_FATALS(GetContainerNames(&container_names));
 
   // Corrupt one container.
-  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   ASSERT_OK(corruptor.PreallocateFullContainer());
 
@@ -1330,7 +1331,7 @@ TEST_F(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
   ASSERT_EQ(0, file_size_on_disk);
 
   // Add some "unpunched blocks" to the container.
-  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   for (int i = 0; i < kNumBlocks; i++) {
     ASSERT_OK(corruptor.AddUnpunchedBlockToFullContainer());
@@ -1371,7 +1372,7 @@ TEST_F(LogBlockManagerTest, TestRepairIncompleteContainer) {
   // Create some incomplete containers. The corruptor will select between
   // several variants of "incompleteness" at random (see
   // LBMCorruptor::CreateIncompleteContainer() for details).
-  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   for (int i = 0; i < kNumContainers; i++) {
     ASSERT_OK(corruptor.CreateIncompleteContainer());
@@ -1409,7 +1410,7 @@ TEST_F(LogBlockManagerTest, TestDetectMalformedRecords) {
   // Add some malformed records. The corruptor will select between
   // several variants of "malformedness" at random (see
   // LBMCorruptor::AddMalformedRecordToContainer for details).
-  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   for (int i = 0; i < kNumRecords; i++) {
     ASSERT_OK(corruptor.AddMalformedRecordToContainer());
@@ -1439,7 +1440,7 @@ TEST_F(LogBlockManagerTest, TestDetectMisalignedBlocks) {
   NO_FATALS(GetOnlyContainer(&container_name));
 
   // Add some misaligned blocks.
-  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   for (int i = 0; i < kNumBlocks; i++) {
     ASSERT_OK(corruptor.AddMisalignedBlockToContainer());
@@ -1478,7 +1479,7 @@ TEST_F(LogBlockManagerTest, TestRepairPartialRecords) {
   ASSERT_EQ(kNumContainers, container_names.size());
 
   // Add some partial records.
-  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   for (int i = 0; i < kNumRecords; i++) {
     ASSERT_OK(corruptor.AddPartialRecordToContainer());
@@ -1681,7 +1682,7 @@ TEST_F(LogBlockManagerTest, TestOpenWithFailedDirectories) {
 
   // Wire in a callback to fail data directories.
   test_error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
-      Bind(&DataDirManager::MarkDataDirFailedByUuid, Unretained(dd_manager_.get())));
+      Bind(&DataDirManager::MarkDirFailedByUuid, Unretained(dd_manager_.get())));
   bm_.reset(CreateBlockManager(nullptr));
 
   // Fail one of the directories, chosen randomly.
@@ -1697,7 +1698,7 @@ TEST_F(LogBlockManagerTest, TestOpenWithFailedDirectories) {
   for (const string& data_dir : report.data_dirs) {
     ASSERT_NE(data_dir, test_dirs[failed_idx]);
   }
-  const set<int>& failed_dirs = dd_manager_->GetFailedDataDirs();
+  const set<int>& failed_dirs = dd_manager_->GetFailedDirs();
   ASSERT_EQ(1, failed_dirs.size());
 
   int uuid_idx;
@@ -1941,7 +1942,7 @@ TEST_F(LogBlockManagerTest, TestDoNotDeleteFakeDeadContainer) {
       }
       ASSERT_OK(transaction->CommitDeletedBlocks(&deleted));
       transaction.reset();
-      for (const auto& data_dir : dd_manager_->data_dirs()) {
+      for (const auto& data_dir : dd_manager_->dirs()) {
         data_dir->WaitOnClosures();
       }
     }
@@ -2007,7 +2008,7 @@ TEST_F(LogBlockManagerTest, TestHalfPresentContainer) {
     transaction->AddDeletedBlock(block_id);
     ASSERT_OK(transaction->CommitDeletedBlocks(&deleted));
     transaction.reset();
-    for (const auto& data_dir : dd_manager_->data_dirs()) {
+    for (const auto& data_dir : dd_manager_->dirs()) {
       data_dir->WaitOnClosures();
     }
   };
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index af182f7..b6c4c4b 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -20,7 +20,6 @@
 #include <errno.h>
 
 #include <algorithm>
-#include <cerrno>
 #include <cstddef>
 #include <cstdint>
 #include <map>
@@ -40,6 +39,7 @@
 
 #include "kudu/fs/block_manager_metrics.h"
 #include "kudu/fs/data_dirs.h"
+#include "kudu/fs/dir_manager.h"
 #include "kudu/fs/dir_util.h"
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/fs.pb.h"
@@ -364,7 +364,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
 
   // Creates a new block container in 'dir'.
   static Status Create(LogBlockManager* block_manager,
-                       DataDir* dir,
+                       Dir* dir,
                        LogBlockContainerRefPtr* container);
 
   // Opens an existing block container in 'dir'.
@@ -377,7 +377,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
   // both appear to have no data (e.g. due to a crash just after creating
   // one of them but before writing any records). This is recorded in 'report'.
   static Status Open(LogBlockManager* block_manager,
-                     DataDir* dir,
+                     Dir* dir,
                      FsReport* report,
                      const string& id,
                      LogBlockContainerRefPtr* container);
@@ -540,7 +540,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
   }
   bool dead() const { return dead_.Load(); }
   const LogBlockManagerMetrics* metrics() const { return metrics_; }
-  DataDir* data_dir() const { return data_dir_; }
+  Dir* data_dir() const { return data_dir_; }
   const DirInstanceMetadataPB* instance() const { return data_dir_->instance()->metadata(); }
 
   // Adjusts the number of blocks being written.
@@ -575,7 +575,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
   }
 
  private:
-  LogBlockContainer(LogBlockManager* block_manager, DataDir* data_dir,
+  LogBlockContainer(LogBlockManager* block_manager, Dir* data_dir,
                     unique_ptr<WritablePBContainerFile> metadata_file,
                     shared_ptr<RWFile> data_file);
 
@@ -597,7 +597,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
   // Note: the status here only represents the result of check.
   static Status CheckContainerFiles(LogBlockManager* block_manager,
                                     FsReport* report,
-                                    const DataDir* dir,
+                                    const Dir* dir,
                                     const string& common_path,
                                     const string& data_path,
                                     const string& metadata_path);
@@ -637,7 +637,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
   LogBlockManager* const block_manager_;
 
   // The data directory where the container lives.
-  DataDir* data_dir_;
+  Dir* data_dir_;
 
   const boost::optional<int64_t> max_num_blocks_;
 
@@ -695,7 +695,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
 
 LogBlockContainer::LogBlockContainer(
     LogBlockManager* block_manager,
-    DataDir* data_dir,
+    Dir* data_dir,
     unique_ptr<WritablePBContainerFile> metadata_file,
     shared_ptr<RWFile> data_file)
     : block_manager_(block_manager),
@@ -739,7 +739,7 @@ void LogBlockContainer::HandleError(const Status& s) const {
 } while (0)
 
 Status LogBlockContainer::Create(LogBlockManager* block_manager,
-                                 DataDir* dir,
+                                 Dir* dir,
                                  LogBlockContainerRefPtr* container) {
   string common_path;
   string metadata_path;
@@ -807,7 +807,7 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
 }
 
 Status LogBlockContainer::Open(LogBlockManager* block_manager,
-                               DataDir* dir,
+                               Dir* dir,
                                FsReport* report,
                                const string& id,
                                LogBlockContainerRefPtr* container) {
@@ -845,7 +845,7 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
 
 Status LogBlockContainer::CheckContainerFiles(LogBlockManager* block_manager,
                                               FsReport* report,
-                                              const DataDir* dir,
+                                              const Dir* dir,
                                               const string& common_path,
                                               const string& data_path,
                                               const string& metadata_path) {
@@ -1140,7 +1140,7 @@ Status LogBlockContainer::WriteVData(int64_t offset, ArrayView<const Slice> data
                                   return sum + curr.size();
                                 });
   if (offset + data_size > preallocated_offset_) {
-    RETURN_NOT_OK_HANDLE_ERROR(data_dir_->RefreshAvailableSpace(DataDir::RefreshMode::ALWAYS));
+    RETURN_NOT_OK_HANDLE_ERROR(data_dir_->RefreshAvailableSpace(Dir::RefreshMode::ALWAYS));
   }
   return Status::OK();
 }
@@ -1225,7 +1225,7 @@ Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset,
     int64_t off = std::max(preallocated_offset_, block_start_offset);
     int64_t len = FLAGS_log_container_preallocate_bytes;
     RETURN_NOT_OK_HANDLE_ERROR(data_file_->PreAllocate(off, len, RWFile::CHANGE_FILE_SIZE));
-    RETURN_NOT_OK_HANDLE_ERROR(data_dir_->RefreshAvailableSpace(DataDir::RefreshMode::ALWAYS));
+    RETURN_NOT_OK_HANDLE_ERROR(data_dir_->RefreshAvailableSpace(Dir::RefreshMode::ALWAYS));
     VLOG(2) << Substitute("Preallocated $0 bytes at offset $1 in container $2",
                           len, off, ToString());
 
@@ -1990,7 +1990,7 @@ Status LogBlockManager::Open(FsReport* report) {
 
   // Establish (and log) block limits for each data directory using kernel,
   // filesystem, and gflags information.
-  for (const auto& dd : dd_manager_->data_dirs()) {
+  for (const auto& dd : dd_manager_->dirs()) {
     boost::optional<int64_t> limit;
     if (FLAGS_log_container_max_blocks == -1) {
       // No limit, unless this is KUDU-1508.
@@ -1998,7 +1998,7 @@ Status LogBlockManager::Open(FsReport* report) {
       // The log block manager requires hole punching and, of the ext
       // filesystems, only ext4 supports it. Thus, if this is an ext
       // filesystem, it's ext4 by definition.
-      if (buggy_el6_kernel_ && dd->fs_type() == DataDirFsType::EXT) {
+      if (buggy_el6_kernel_ && dd->fs_type() == FsType::EXT) {
         uint64_t fs_block_size =
             dd->instance()->metadata()->filesystem_block_size_bytes();
         bool untested_block_size =
@@ -2028,17 +2028,17 @@ Status LogBlockManager::Open(FsReport* report) {
   }
 
   // Open containers in each data dirs.
-  vector<Status> statuses(dd_manager_->data_dirs().size());
+  vector<Status> statuses(dd_manager_->dirs().size());
   vector<vector<unique_ptr<internal::LogBlockContainerLoadResult>>> container_results(
-      dd_manager_->data_dirs().size());
+      dd_manager_->dirs().size());
   int i = -1;
-  for (const auto& dd : dd_manager_->data_dirs()) {
+  for (const auto& dd : dd_manager_->dirs()) {
     i++;
     int uuid_idx;
-    CHECK(dd_manager_->FindUuidIndexByDataDir(dd.get(), &uuid_idx));
+    CHECK(dd_manager_->FindUuidIndexByDir(dd.get(), &uuid_idx));
     // TODO(awong): store Statuses for each directory in the directory manager
     // so we can avoid these artificial Statuses.
-    if (dd_manager_->IsDataDirFailed(uuid_idx)) {
+    if (dd_manager_->IsDirFailed(uuid_idx)) {
       statuses[i] = Status::IOError("Data directory failed", "", EIO);
       continue;
     }
@@ -2053,16 +2053,16 @@ Status LogBlockManager::Open(FsReport* report) {
   }
 
   // Wait for the opens to complete.
-  for (const auto& dd : dd_manager_->data_dirs()) {
+  for (const auto& dd : dd_manager_->dirs()) {
     dd->WaitOnClosures();
   }
 
   // Check load errors and merge each data dir's container load results, then do repair tasks.
   vector<unique_ptr<internal::LogBlockContainerLoadResult>> dir_results(
-      dd_manager_->data_dirs().size());
-  for (int i = 0; i < dd_manager_->data_dirs().size(); ++i) {
+      dd_manager_->dirs().size());
+  for (int i = 0; i < dd_manager_->dirs().size(); ++i) {
     const auto& s = statuses[i];
-    const auto& dd = dd_manager_->data_dirs()[i];
+    const auto& dd = dd_manager_->dirs()[i];
     RETURN_ON_NON_DISK_FAILURE(dd, s);
     // If open dir error, do not try to repair.
     if (PREDICT_FALSE(!s.ok())) {
@@ -2105,12 +2105,12 @@ Status LogBlockManager::Open(FsReport* report) {
   }
 
   // Wait for the repair tasks to complete.
-  for (const auto& dd : dd_manager_->data_dirs()) {
+  for (const auto& dd : dd_manager_->dirs()) {
     dd->WaitOnClosures();
   }
 
   FsReport merged_report;
-  for (int i = 0; i < dd_manager_->data_dirs().size(); ++i) {
+  for (int i = 0; i < dd_manager_->dirs().size(); ++i) {
     if (PREDICT_FALSE(!dir_results[i])) {
       continue;
     }
@@ -2118,10 +2118,10 @@ Status LogBlockManager::Open(FsReport* report) {
       merged_report.MergeFrom(dir_results[i]->report);
       continue;
     }
-    RETURN_ON_NON_DISK_FAILURE(dd_manager_->data_dirs()[i], dir_results[i]->status);
+    RETURN_ON_NON_DISK_FAILURE(dd_manager_->dirs()[i], dir_results[i]->status);
   }
 
-  if (dd_manager_->GetFailedDataDirs().size() == dd_manager_->data_dirs().size()) {
+  if (dd_manager_->GetFailedDirs().size() == dd_manager_->dirs().size()) {
     return Status::IOError("All data dirs failed to open", "", EIO);
   }
 
@@ -2246,7 +2246,7 @@ void LogBlockManager::RemoveDeadContainer(const string& container_name) {
 
 Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
                                              LogBlockContainerRefPtr* container) {
-  DataDir* dir;
+  Dir* dir;
   RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir),
       error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, opts.tablet_id));
 
@@ -2454,10 +2454,10 @@ Status LogBlockManager::RemoveLogBlock(const BlockId& block_id,
       error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, container->data_dir()));
 
   // Return early if deleting a block in a failed directory.
-  set<int> failed_dirs = dd_manager_->GetFailedDataDirs();
+  set<int> failed_dirs = dd_manager_->GetFailedDirs();
   if (PREDICT_FALSE(!failed_dirs.empty())) {
     int uuid_idx;
-    CHECK(dd_manager_->FindUuidIndexByDataDir(container->data_dir(), &uuid_idx));
+    CHECK(dd_manager_->FindUuidIndexByDir(container->data_dir(), &uuid_idx));
     if (ContainsKey(failed_dirs, uuid_idx)) {
       LOG_EVERY_N(INFO, 10) << Substitute("Block $0 is in a failed directory; not deleting",
                                           block_id.ToString());
@@ -2473,7 +2473,7 @@ Status LogBlockManager::RemoveLogBlock(const BlockId& block_id,
 }
 
 void LogBlockManager::OpenDataDir(
-    DataDir* dir,
+    Dir* dir,
     vector<unique_ptr<internal::LogBlockContainerLoadResult>>* results,
     Status* result_status) {
   // Find all containers and open them.
@@ -2526,7 +2526,7 @@ void LogBlockManager::OpenDataDir(
   }
 }
 
-void LogBlockManager::LoadContainer(DataDir* dir,
+void LogBlockManager::LoadContainer(Dir* dir,
                                     LogBlockContainerRefPtr container,
                                     internal::LogBlockContainerLoadResult* result) {
   // Process the records, building a container-local map for live blocks and
@@ -2709,7 +2709,7 @@ void LogBlockManager::LoadContainer(DataDir* dir,
   }
 }
 
-void LogBlockManager::RepairTask(DataDir* dir, internal::LogBlockContainerLoadResult* result) {
+void LogBlockManager::RepairTask(Dir* dir, internal::LogBlockContainerLoadResult* result) {
   result->status = Repair(dir,
                           &result->report,
                           std::move(result->need_repunching_blocks),
@@ -2732,7 +2732,7 @@ void LogBlockManager::RepairTask(DataDir* dir, internal::LogBlockContainerLoadRe
 } while (0)
 
 Status LogBlockManager::Repair(
-    DataDir* dir,
+    Dir* dir,
     FsReport* report,
     vector<LogBlockRefPtr> need_repunching,
     vector<LogBlockContainerRefPtr> dead_containers,
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 3c26f90..016b597 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -49,7 +49,7 @@ class Env;
 class RWFile;
 
 namespace fs {
-class DataDir;
+class Dir;
 class DataDirManager;
 class FsErrorManager;
 struct FsReport;
@@ -330,7 +330,7 @@ class LogBlockManager : public BlockManager {
                         LogBlockRefPtr* lb);
 
   // Simple wrapper of Repair(), used as a runnable function in thread.
-  void RepairTask(DataDir* dir, internal::LogBlockContainerLoadResult* result);
+  void RepairTask(Dir* dir, internal::LogBlockContainerLoadResult* result);
 
   // Repairs any inconsistencies for 'dir' described in 'report'.
   //
@@ -341,7 +341,7 @@ class LogBlockManager : public BlockManager {
   //    files compacted.
   //
   // Returns an error if repairing a fatal inconsistency failed.
-  Status Repair(DataDir* dir,
+  Status Repair(Dir* dir,
                 FsReport* report,
                 std::vector<LogBlockRefPtr> need_repunching,
                 std::vector<LogBlockContainerRefPtr> dead_containers,
@@ -365,13 +365,13 @@ class LogBlockManager : public BlockManager {
   // results of consistency checking are written to 'results'.
   //
   // Success or failure is set in 'result_status'.
-  void OpenDataDir(DataDir* dir,
+  void OpenDataDir(Dir* dir,
                    std::vector<std::unique_ptr<internal::LogBlockContainerLoadResult>>* results,
                    Status* result_status);
 
   // Reads records from one log block container in the data directory.
   // The result details will be collected into 'result'.
-  void LoadContainer(DataDir* dir,
+  void LoadContainer(Dir* dir,
                      LogBlockContainerRefPtr container,
                      internal::LogBlockContainerLoadResult* result);
 
@@ -438,7 +438,7 @@ class LogBlockManager : public BlockManager {
 
   // Maps a data directory to an upper bound on the number of blocks that a
   // container residing in that directory should observe, if one is necessary.
-  std::unordered_map<const DataDir*,
+  std::unordered_map<const Dir*,
                      boost::optional<int64_t>> block_limits_by_data_dir_;
 
   // Manages files opened for reading.
@@ -452,7 +452,7 @@ class LogBlockManager : public BlockManager {
   // excluding containers that are either in use or full.
   //
   // Does not own the containers.
-  std::unordered_map<const DataDir*,
+  std::unordered_map<const Dir*,
                      std::deque<LogBlockContainerRefPtr>> available_containers_by_data_dir_;
 
   // Tracks dirty container directories.
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 49f9d50..16bd409 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -23,6 +23,7 @@
 #include <cstdio>
 #include <fstream>
 #include <iterator>
+#include <initializer_list>
 #include <map>
 #include <memory>
 #include <set>
@@ -4927,7 +4928,7 @@ TEST_F(ToolTest, TestFsSwappingDirectoriesFailsGracefully) {
   Status s = RunTool(Substitute(
       "fs update_dirs --fs_wal_dir=$0 --fs_data_dirs=$1",
       wal_root, new_data_root_no_wal), nullptr, &stderr);
-  ASSERT_STR_CONTAINS(stderr, "no healthy data directories found");
+  ASSERT_STR_CONTAINS(stderr, "no healthy directories found");
 
   // If we instead try to add the directory to the existing list of
   // directories, Kudu should allow it.
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc
index a12782c..943966b 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -16,17 +16,17 @@
 // under the License.
 #include "kudu/tserver/tablet_copy_client.h"
 
+#include <stdlib.h>
+
 #include <cstdint>
 #include <limits>
 #include <memory>
 #include <ostream>
-#include <stdlib.h>
 #include <string>
 #include <thread>
 #include <vector>
 
 #include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
@@ -399,7 +399,7 @@ TEST_F(TabletCopyClientTest, TestFailedDiskStopsClient) {
   // metadata directory).
   while (true) {
     if (rand() % 10 == 0) {
-      dd_manager->MarkDataDirFailed(1, "injected failure in non-client thread");
+      dd_manager->MarkDirFailed(1, "injected failure in non-client thread");
       LOG(INFO) << "INJECTING FAILURE";
       break;
     }
diff --git a/src/kudu/tserver/tablet_copy_service-test.cc b/src/kudu/tserver/tablet_copy_service-test.cc
index 2658d9f..1dc0db6 100644
--- a/src/kudu/tserver/tablet_copy_service-test.cc
+++ b/src/kudu/tserver/tablet_copy_service-test.cc
@@ -14,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#include "kudu/tserver/tablet_copy-test-base.h"
 
 #include <atomic>
 #include <cstdint>
@@ -27,6 +26,7 @@
 
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
+#include <google/protobuf/stubs/port.h>
 #include <gtest/gtest.h>
 
 #include "kudu/common/wire_protocol.h"
@@ -46,6 +46,7 @@
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_copy-test-base.h"
 #include "kudu/tserver/tablet_copy.pb.h"
 #include "kudu/tserver/tablet_copy.proxy.h"
 #include "kudu/tserver/tablet_server.h"
@@ -530,7 +531,7 @@ TEST_F(TabletCopyServiceTest, TestDiskFailureDuringSession) {
   // Copy over the block while one of the directories is failed.
   FetchDataResponsePB resp;
   RpcController controller;
-  ASSERT_OK(mini_server_->server()->fs_manager()->dd_manager()->MarkDataDirFailed(1));
+  ASSERT_OK(mini_server_->server()->fs_manager()->dd_manager()->MarkDirFailed(1));
   Status s = DoFetchData(session_id, AsDataTypeId(block_id), nullptr, nullptr, &resp, &controller);
   LOG(INFO) << "Fetch data request responded with: " << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "Unable to get piece of data block");
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index d468190..e09366e 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -698,7 +698,7 @@ TEST_P(TabletServerDiskSpaceTest, TestFullGroupAddsDir) {
   ASSERT_FALSE(new_dir.empty());
   string new_uuid;
   ASSERT_TRUE(dd_manager->FindUuidByRoot(DirName(new_dir), &new_uuid));
-  dd_manager->MarkDataDirFailedByUuid(new_uuid);
+  dd_manager->MarkDirFailedByUuid(new_uuid);
   ASSERT_TRUE(dd_manager->IsTabletInFailedDir(kTabletId));
 
   // The group should be the updated even after restarting the tablet server.
@@ -707,7 +707,7 @@ TEST_P(TabletServerDiskSpaceTest, TestFullGroupAddsDir) {
   ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group));
   ASSERT_EQ(kNumDirs, dir_group.size());
   ASSERT_TRUE(dd_manager->FindUuidByRoot(DirName(new_dir), &new_uuid));
-  dd_manager->MarkDataDirFailedByUuid(new_uuid);
+  dd_manager->MarkDirFailedByUuid(new_uuid);
   ASSERT_TRUE(dd_manager->IsTabletInFailedDir(kTabletId));
 }
 
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 20e1e28..8695312 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -29,7 +29,6 @@
 #include <boost/bind.hpp> // IWYU pragma: keep
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/clock/clock.h"
@@ -1515,15 +1514,15 @@ void TSTabletManager::FailTabletsInDataDir(const string& uuid) {
   int uuid_idx;
   CHECK(dd_manager->FindUuidIndexByUuid(uuid, &uuid_idx))
       << Substitute("No data directory found with UUID $0", uuid);
-  if (fs_manager_->dd_manager()->IsDataDirFailed(uuid_idx)) {
+  if (fs_manager_->dd_manager()->IsDirFailed(uuid_idx)) {
     LOG(WARNING) << "Data directory is already marked failed.";
     return;
   }
   // Fail the directory to prevent other tablets from being placed in it.
-  dd_manager->MarkDataDirFailed(uuid_idx);
-  set<string> tablets = dd_manager->FindTabletsByDataDirUuidIdx(uuid_idx);
+  dd_manager->MarkDirFailed(uuid_idx);
+  set<string> tablets = dd_manager->FindTabletsByDirUuidIdx(uuid_idx);
   LOG(INFO) << Substitute("Data dir $0 has $1 tablets", uuid, tablets.size());
-  for (const string& tablet_id : dd_manager->FindTabletsByDataDirUuidIdx(uuid_idx)) {
+  for (const string& tablet_id : dd_manager->FindTabletsByDirUuidIdx(uuid_idx)) {
     FailTabletAndScheduleShutdown(tablet_id);
   }
 }
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 863c1b7..7e1fc15 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -32,6 +32,7 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/tablet_copy_client.h"
 #include "kudu/tserver/tablet_replica_lookup.h"
 #include "kudu/tserver/tserver.pb.h"
@@ -73,7 +74,6 @@ class TabletReportPB;
 
 namespace tablet {
 class TabletMetadata;
-class TabletReplica;
 }
 
 namespace tserver {


Mime
View raw message