kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject kudu git commit: separate DataDirManager from BlockManagers
Date Tue, 22 Aug 2017 23:17:48 GMT
Repository: kudu
Updated Branches:
  refs/heads/master e1c753e58 -> 61a227735


separate DataDirManager from BlockManagers

Currently, the DataDirManager is owned by the BlockManagers. Since only
blocks are placed in 'data dirs' (i.e. subdirectories named 'data' under
the roots specified by 'fs_data_dirs'), this hierarchy made sense.
However, it would be nice to track other files that fall within
'fs_data_dirs' (e.g. tablet-metadata, consensus-metadata). Splitting the
directory manager from the block manager makes this more feasible.

This patch introduces a number of changes to the FsManager,
BlockManager, and DataDirManager with respect to initialization.
- BlockManagers no longer have a Create() function
- BlockManager's dtor will now just wait for DataDir closures to finish
  instead of shutting down the DataDirManager as to not directly affect
  the DataDirManager
- DataDirManagers now have two static constructors: one to open an
  existing layout, and another to create and open a new layout
- FsManagers will only create the BlockManager when opening an fs layout
- FsManagers will only Open() a DataDirManager if one has not already
  been constructed
- A validator is added to check the value of FLAGS_block_manager
  before any of the above initialization can occur

Change-Id: I387e4e88bd65298970a1e4201879de08fac07ace
Reviewed-on: http://gerrit.cloudera.org:8080/7602
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/61a22773
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/61a22773
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/61a22773

Branch: refs/heads/master
Commit: 61a227735032974ea0c5c3045be7f092a41ac350
Parents: e1c753e
Author: Andrew Wong <awong@cloudera.com>
Authored: Fri Aug 4 16:06:34 2017 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Tue Aug 22 23:17:06 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager-stress-test.cc |  61 +++++++-----
 src/kudu/fs/block_manager-test.cc        |  79 ++++++++-------
 src/kudu/fs/block_manager.h              |  12 ---
 src/kudu/fs/data_dirs-test.cc            |  11 +-
 src/kudu/fs/data_dirs.cc                 | 138 +++++++++++++++++++-------
 src/kudu/fs/data_dirs.h                  | 135 ++++++++++++++++++-------
 src/kudu/fs/file_block_manager.cc        |  38 ++-----
 src/kudu/fs/file_block_manager.h         |  18 ++--
 src/kudu/fs/fs_manager-test.cc           |   1 +
 src/kudu/fs/fs_manager.cc                |  58 +++++++----
 src/kudu/fs/fs_manager.h                 |  11 +-
 src/kudu/fs/log_block_manager-test.cc    | 100 +++++++++++--------
 src/kudu/fs/log_block_manager.cc         |  45 +++------
 src/kudu/fs/log_block_manager.h          |  17 ++--
 src/kudu/util/path_util.cc               |  11 +-
 src/kudu/util/path_util.h                |   8 +-
 16 files changed, 440 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/block_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index 4d1e919..c501012 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -148,39 +148,52 @@ class BlockManagerStressTest : public KuduTest {
     // depending on the number of CPUs on the system.
     FLAGS_cache_force_single_shard = true;
 
-    if (FLAGS_block_manager_paths.empty()) {
-      data_dirs_.push_back(test_dir_);
-    } else {
-      data_dirs_ = strings::Split(FLAGS_block_manager_paths, ",",
-                                  strings::SkipEmpty());
-    }
-
     // Defer block manager creation until after the above flags are set.
     bm_.reset(CreateBlockManager());
+    bm_->Open(nullptr);
+    dd_manager_->CreateDataDirGroup(test_tablet_name_);
+    CHECK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
   }
 
-  virtual void SetUp() OVERRIDE {
-    CHECK_OK(bm_->Create());
-    CHECK_OK(bm_->Open(nullptr));
-    CHECK_OK(bm_->dd_manager()->CreateDataDirGroup(test_tablet_name_));
-    CHECK(bm_->dd_manager()->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
-  }
+  virtual void TearDown() override {
+    // Ensure the proper destructor order. The directory manager must outlive
+    // the block manager.
+    bm_.reset();
 
-  virtual void TearDown() OVERRIDE {
-    // If non-standard paths were provided we need to delete them in
-    // between test runs.
+    // If non-standard paths were provided we need to delete them in between
+    // test runs.
     if (!FLAGS_block_manager_paths.empty()) {
-      for (const auto& dd : data_dirs_) {
+      for (const auto& dd : dd_manager_->GetDataRoots()) {
         WARN_NOT_OK(env_->DeleteRecursively(dd),
                     Substitute("Couldn't recursively delete $0", dd));
       }
     }
+    dd_manager_.reset();
   }
 
   BlockManager* CreateBlockManager() {
+    // The directory manager must outlive the block manager. Destroy the block
+    // manager first to enforce this.
+    bm_.reset();
+
+    vector<string> data_dirs;
+    if (FLAGS_block_manager_paths.empty()) {
+      data_dirs.push_back(test_dir_);
+    } else {
+      data_dirs = strings::Split(FLAGS_block_manager_paths, ",",
+                                 strings::SkipEmpty());
+    }
+    if (!dd_manager_) {
+      // Create a new directory manager if necessary.
+      CHECK_OK(DataDirManager::CreateNew(env_, data_dirs,
+          DataDirManagerOptions(), &dd_manager_));
+    } else {
+      // Open a existing directory manager, wiping away in-memory maps.
+      CHECK_OK(DataDirManager::OpenExisting(env_, data_dirs,
+          DataDirManagerOptions(), &dd_manager_));
+    }
     BlockManagerOptions opts;
-    opts.root_paths = data_dirs_;
-    return new T(env_, test_error_manager_.get(), opts);
+    return new T(env_, dd_manager_.get(), test_error_manager_.get(), opts);
   }
 
   void RunTest(double secs) {
@@ -241,9 +254,6 @@ class BlockManagerStressTest : public KuduTest {
   void InjectNonFatalInconsistencies();
 
  protected:
-  // Directories where blocks will be written.
-  vector<string> data_dirs_;
-
   // Used to generate random data. All PRNG instances are seeded with this
   // value to ensure that the test is reproducible.
   int rand_seed_;
@@ -263,6 +273,9 @@ class BlockManagerStressTest : public KuduTest {
   // The block manager.
   gscoped_ptr<BlockManager> bm_;
 
+  // The directory manager.
+  unique_ptr<DataDirManager> dd_manager_;
+
   // The error manager.
   unique_ptr<FsErrorManager> test_error_manager_;
 
@@ -485,7 +498,7 @@ void BlockManagerStressTest<FileBlockManager>::InjectNonFatalInconsistencies() {
 
 template <>
 void BlockManagerStressTest<LogBlockManager>::InjectNonFatalInconsistencies() {
-  LBMCorruptor corruptor(env_, data_dirs_, rand_seed_);
+  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), rand_seed_);
   ASSERT_OK(corruptor.Init());
 
   for (int i = 0; i < FLAGS_num_inconsistencies; i++) {
@@ -525,7 +538,7 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
     this->bm_.reset(this->CreateBlockManager());
     FsReport report;
     ASSERT_OK(this->bm_->Open(&report));
-    ASSERT_OK(this->bm_->dd_manager()->LoadDataDirGroupFromPB(this->test_tablet_name_,
+    ASSERT_OK(this->dd_manager_->LoadDataDirGroupFromPB(this->test_tablet_name_,
                                                               this->test_group_pb_));
     ASSERT_OK(report.LogAndCheckForFatalErrors());
     this->RunTest(FLAGS_test_duration_secs / kNumStarts);

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index b02407b..fb4251b 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -110,18 +110,15 @@ class BlockManagerTest : public KuduTest {
     test_block_opts_(CreateBlockOptions({ test_tablet_name_ })),
     test_error_manager_(new FsErrorManager()),
     bm_(CreateBlockManager(scoped_refptr<MetricEntity>(),
-                           shared_ptr<MemTracker>(),
-                           { test_dir_ })) {
+                           shared_ptr<MemTracker>())) {
   }
 
-  virtual void SetUp() OVERRIDE {
-    CHECK_OK(bm_->Create());
-    // Pass in a report to prevent the block manager from logging
-    // unnecessarily.
+  virtual void SetUp() override {
+    // Pass in a report to prevent the block manager from logging unnecessarily.
     FsReport report;
     CHECK_OK(bm_->Open(&report));
-    CHECK_OK(bm_->dd_manager()->CreateDataDirGroup(test_tablet_name_));
-    CHECK(bm_->dd_manager()->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
+    CHECK_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
+    CHECK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
   }
 
   void DistributeBlocksAcrossDirs(int num_dirs, int num_blocks_per_dir) {
@@ -129,7 +126,7 @@ class BlockManagerTest : public KuduTest {
     string tablet_name = Substitute("$0_disks", num_dirs);
     CreateBlockOptions opts({ tablet_name });
     FLAGS_fs_target_data_dirs_per_tablet = num_dirs;
-    ASSERT_OK(bm_->dd_manager()->CreateDataDirGroup(tablet_name));
+    ASSERT_OK(dd_manager_->CreateDataDirGroup(tablet_name));
     int num_blocks = num_dirs * num_blocks_per_dir;
 
     // Write 'num_blocks' blocks to this data dir group.
@@ -145,13 +142,16 @@ class BlockManagerTest : public KuduTest {
 
  protected:
   T* CreateBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
-                        const shared_ptr<MemTracker>& parent_mem_tracker,
-                        const vector<string>& paths) {
+                        const shared_ptr<MemTracker>& parent_mem_tracker) {
+    if (!dd_manager_) {
+      // Create a new directory manager if necessary.
+      CHECK_OK(DataDirManager::CreateNew(env_, { test_dir_ },
+          DataDirManagerOptions(), &dd_manager_));
+    }
     BlockManagerOptions opts;
     opts.metric_entity = metric_entity;
     opts.parent_mem_tracker = parent_mem_tracker;
-    opts.root_paths = paths;
-    return new T(env_, test_error_manager_.get(), opts);
+    return new T(env_, this->dd_manager_.get(), test_error_manager_.get(), opts);
   }
 
   Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
@@ -159,16 +159,23 @@ class BlockManagerTest : public KuduTest {
                             const vector<string>& paths,
                             bool create,
                             bool load_test_group = true) {
-    bm_.reset(CreateBlockManager(metric_entity, parent_mem_tracker, paths));
+    // The directory manager must outlive the block manager. Destroy the block
+    // manager first to enforce this.
+    bm_.reset();
+    DataDirManagerOptions opts;
+    opts.metric_entity = metric_entity;
     if (create) {
-      RETURN_NOT_OK(bm_->Create());
+      RETURN_NOT_OK(DataDirManager::CreateNew(env_, paths, opts, &dd_manager_));
+    } else {
+      RETURN_NOT_OK(DataDirManager::OpenExisting(env_, paths, opts, &dd_manager_));
     }
+    bm_.reset(CreateBlockManager(metric_entity, parent_mem_tracker));
     RETURN_NOT_OK(bm_->Open(nullptr));
 
     // Certain tests may maintain their own directory groups, in which case
     // the default test group should not be used.
     if (load_test_group) {
-      RETURN_NOT_OK(bm_->dd_manager()->LoadDataDirGroupFromPB(test_tablet_name_, test_group_pb_));
+      RETURN_NOT_OK(dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, test_group_pb_));
     }
     return Status::OK();
   }
@@ -205,21 +212,20 @@ class BlockManagerTest : public KuduTest {
   string test_tablet_name_;
   CreateBlockOptions test_block_opts_;
   unique_ptr<FsErrorManager> test_error_manager_;
-  gscoped_ptr<T> bm_;
+  unique_ptr<DataDirManager> dd_manager_;
+  unique_ptr<T> bm_;
 };
 
 template <>
 void BlockManagerTest<LogBlockManager>::SetUp() {
   RETURN_NOT_LOG_BLOCK_MANAGER();
-  CHECK_OK(bm_->Create());
-  // Pass in a report to prevent the block manager from logging
-  // unnecessarily.
+  // Pass in a report to prevent the block manager from logging unnecessarily.
   FsReport report;
-  CHECK_OK(bm_->Open(&report));
-  CHECK_OK(bm_->dd_manager()->CreateDataDirGroup(test_tablet_name_));
+  ASSERT_OK(bm_->Open(&report));
+  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
 
   // Store the DataDirGroupPB for tests that reopen the block manager.
-  CHECK(bm_->dd_manager()->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
+  CHECK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
 }
 
 template <>
@@ -287,7 +293,7 @@ void BlockManagerTest<LogBlockManager>::RunBlockDistributionTest(const vector<st
 template <>
 void BlockManagerTest<FileBlockManager>::RunMultipathTest(const vector<string>& paths) {
   // Ensure that each path has an instance file and that it's well-formed.
-  for (const string& path : paths) {
+  for (const string& path : dd_manager_->GetDataDirs()) {
     vector<string> children;
     ASSERT_OK(env_->GetChildren(path, &children));
     ASSERT_EQ(3, children.size());
@@ -307,7 +313,7 @@ void BlockManagerTest<FileBlockManager>::RunMultipathTest(const vector<string>&
   // high probability.
   CreateBlockOptions opts({ "multipath_test" });
   FLAGS_fs_target_data_dirs_per_tablet = 3;
-  ASSERT_OK(bm_->dd_manager()->CreateDataDirGroup("multipath_test"));
+  ASSERT_OK(dd_manager_->CreateDataDirGroup("multipath_test"));
 
   // Write twenty blocks.
   const char* kTestData = "test data";
@@ -338,7 +344,7 @@ void BlockManagerTest<LogBlockManager>::RunMultipathTest(const vector<string>& p
   // yield two containers per path.
   CreateBlockOptions opts({ "multipath_test" });
   FLAGS_fs_target_data_dirs_per_tablet = 3;
-  ASSERT_OK(bm_->dd_manager()->CreateDataDirGroup("multipath_test"));
+  ASSERT_OK(dd_manager_->CreateDataDirGroup("multipath_test"));
 
   const char* kTestData = "test data";
   ScopedWritableBlockCloser closer;
@@ -351,16 +357,18 @@ void BlockManagerTest<LogBlockManager>::RunMultipathTest(const vector<string>& p
   }
   ASSERT_OK(closer.CloseBlocks());
 
-  // Verify the results. Each path has dot, dotdot, instance file.
-  // (numPaths * 2) containers were created, each consisting of 2 files.
-  // Thus, there should be a total of (numPaths * (3 + 4)) files.
+  // Verify the results. (numPaths * 2) containers were created, each
+  // consisting of 2 files. Thus, there should be a total of
+  // (numPaths * 4) files, ignoring '.', '..', and instance files.
   int sum = 0;
   for (const string& path : paths) {
     vector<string> children;
     ASSERT_OK(env_->GetChildren(path, &children));
-    sum += children.size();
+    int files_in_path = 0;
+    ASSERT_OK(CountFiles(path, &files_in_path));
+    sum += files_in_path;
   }
-  ASSERT_EQ(paths.size() * 7, sum);
+  ASSERT_EQ(paths.size() * 4, sum);
 }
 
 template <>
@@ -460,14 +468,14 @@ TYPED_TEST(BlockManagerTest, CreateBlocksInDataDirs) {
                                     "directory group registered for tablet");
 
   // Ensure the data dir groups can only be created once.
-  s = this->bm_->dd_manager()->CreateDataDirGroup(this->test_tablet_name_);
+  s = this->dd_manager_->CreateDataDirGroup(this->test_tablet_name_);
   ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "Tried to create directory group for tablet "
                                     "but one is already registered");
 
   DataDirGroupPB test_group_pb;
   // Check that the in-memory DataDirGroup did not change.
-  ASSERT_TRUE(this->bm_->dd_manager()->GetDataDirGroupPB(
+  ASSERT_TRUE(this->dd_manager_->GetDataDirGroupPB(
       this->test_tablet_name_, &test_group_pb));
   ASSERT_TRUE(MessageDifferencer::Equals(test_group_pb, this->test_group_pb_));
 }
@@ -667,8 +675,7 @@ TYPED_TEST(BlockManagerTest, PersistenceTest) {
   // on-disk metadata should still be clean.
   gscoped_ptr<BlockManager> new_bm(this->CreateBlockManager(
       scoped_refptr<MetricEntity>(),
-      MemTracker::CreateTracker(-1, "other tracker"),
-      { this->test_dir_ }));
+      MemTracker::CreateTracker(-1, "other tracker")));
   ASSERT_OK(new_bm->Open(nullptr));
 
   // Test that the state of all three blocks is properly reflected.
@@ -694,6 +701,7 @@ TYPED_TEST(BlockManagerTest, BlockDistributionTest) {
   vector<string> paths;
   for (int i = 0; i < 5; i++) {
     paths.push_back(this->GetTestPath(Substitute("block_dist_path$0", i)));
+    ASSERT_OK(this->env_->CreateDir(paths[i]));
   }
   ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
                                      shared_ptr<MemTracker>(),
@@ -708,6 +716,7 @@ TYPED_TEST(BlockManagerTest, MultiPathTest) {
   vector<string> paths;
   for (int i = 0; i < 3; i++) {
     paths.push_back(this->GetTestPath(Substitute("path$0", i)));
+    ASSERT_OK(this->env_->CreateDir(paths[i]));
   }
   ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
                                      shared_ptr<MemTracker>(),

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index aa13a97..0177d33 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -189,9 +189,6 @@ struct BlockManagerOptions {
   // If NULL, new memory trackers will be parented to the root tracker.
   std::shared_ptr<MemTracker> parent_mem_tracker;
 
-  // The paths where data blocks will be stored. Cannot be empty.
-  std::vector<std::string> root_paths;
-
   // Whether the block manager should only allow reading. Defaults to false.
   bool read_only;
 };
@@ -202,12 +199,6 @@ class BlockManager {
  public:
   virtual ~BlockManager() {}
 
-  // Creates a new on-disk representation for this block manager. Must be
-  // followed up with a call to Open() to use the block manager.
-  //
-  // Returns an error if one already exists or cannot be created.
-  virtual Status Create() = 0;
-
   // Opens an existing on-disk representation of this block manager and
   // checks it for inconsistencies. If found, and if the block manager was not
   // constructed in read-only mode, an attempt will be made to repair them.
@@ -266,9 +257,6 @@ class BlockManager {
   // even exist after the call.
   virtual Status GetAllBlockIds(std::vector<BlockId>* block_ids) = 0;
 
-  // Exposes the underlying DataDirManager.
-  virtual DataDirManager* dd_manager() = 0;
-
   // Exposes the FsErrorManager used to handle fs errors.
   virtual FsErrorManager* error_manager() = 0;
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/data_dirs-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/data_dirs-test.cc b/src/kudu/fs/data_dirs-test.cc
index 81a4036..e498a1f 100644
--- a/src/kudu/fs/data_dirs-test.cc
+++ b/src/kudu/fs/data_dirs-test.cc
@@ -26,6 +26,7 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/data_dirs.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -50,21 +51,20 @@ using internal::DataDirGroup;
 
 static const char* kDirNamePrefix = "test_data_dir";
 static const int kNumDirs = 10;
-static const int kMaxPaths = (1 << 16) - 1;
 
 class DataDirGroupTest : public KuduTest {
  public:
   DataDirGroupTest() :
       test_tablet_name_("test_tablet"),
       test_block_opts_(CreateBlockOptions({ test_tablet_name_ })),
-      entity_(METRIC_ENTITY_server.Instantiate(&registry_, "test")),
-      dd_manager_(new DataDirManager(env_, entity_, "file", GetDirNames(kNumDirs))) {}
+      entity_(METRIC_ENTITY_server.Instantiate(&registry_, "test")) {}
 
   virtual void SetUp() override {
     KuduTest::SetUp();
     FLAGS_fs_target_data_dirs_per_tablet = kNumDirs / 2 + 1;
-    ASSERT_OK(dd_manager_->Create(0));
-    ASSERT_OK(dd_manager_->Open(kMaxPaths, DataDirManager::LockMode::NONE));
+    DataDirManagerOptions opts;
+    opts.metric_entity = entity_;
+    ASSERT_OK(DataDirManager::CreateNew(env_, GetDirNames(kNumDirs), opts, &dd_manager_));
   }
 
  protected:
@@ -73,6 +73,7 @@ class DataDirGroupTest : public KuduTest {
     for (int i = 0; i < num_dirs; i++) {
       string dir_name = Substitute("$0-$1", kDirNamePrefix, i);
       ret.push_back(GetTestPath(dir_name));
+      CHECK_OK(env_->CreateDir(ret[i]));
     }
     return ret;
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/data_dirs.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index 99177cf..96b476e 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -86,6 +86,12 @@ DEFINE_int32(fs_data_dirs_full_disk_cache_seconds, 30,
 TAG_FLAG(fs_data_dirs_full_disk_cache_seconds, advanced);
 TAG_FLAG(fs_data_dirs_full_disk_cache_seconds, evolving);
 
+DEFINE_bool(fs_lock_data_dirs, true,
+            "Lock the data directories to prevent concurrent usage. "
+            "Note that read-only concurrent usage is still allowed.");
+TAG_FLAG(fs_lock_data_dirs, unsafe);
+TAG_FLAG(fs_lock_data_dirs, evolving);
+
 METRIC_DEFINE_gauge_uint64(server, data_dirs_failed,
                            "Data Directories Failed",
                            kudu::MetricUnit::kDataDirectories,
@@ -96,6 +102,8 @@ METRIC_DEFINE_gauge_uint64(server, data_dirs_full,
                            kudu::MetricUnit::kDataDirectories,
                            "Number of data directories whose disks are currently full");
 
+DECLARE_string(block_manager);
+
 namespace kudu {
 
 namespace fs {
@@ -204,7 +212,7 @@ void DataDir::Shutdown() {
     return;
   }
 
-  pool_->Wait();
+  WaitOnClosures();
   pool_->Shutdown();
   is_shutdown_ = true;
 }
@@ -265,18 +273,24 @@ Status DataDir::RefreshIsFull(RefreshMode mode) {
   return Status::OK();
 }
 
+const char* DataDirManager::kDataDirName = "data";
+
+DataDirManagerOptions::DataDirManagerOptions()
+  : read_only(false) {
+}
+
 DataDirManager::DataDirManager(Env* env,
-                               scoped_refptr<MetricEntity> metric_entity,
-                               string block_manager_type,
-                               vector<string> paths)
+                               DataDirManagerOptions opts,
+                               vector<string> canonicalized_data_roots)
     : env_(env),
-      block_manager_type_(std::move(block_manager_type)),
-      paths_(std::move(paths)),
+      block_manager_type_(FLAGS_block_manager),
+      read_only_(opts.read_only),
+      canonicalized_data_fs_roots_(std::move(canonicalized_data_roots)),
       rng_(GetRandomSeed32()) {
-  DCHECK_GT(paths_.size(), 0);
+  DCHECK_GT(canonicalized_data_fs_roots_.size(), 0);
 
-  if (metric_entity) {
-    metrics_.reset(new DataDirMetrics(metric_entity));
+  if (opts.metric_entity) {
+    metrics_.reset(new DataDirMetrics(opts.metric_entity));
   }
 }
 
@@ -284,6 +298,12 @@ DataDirManager::~DataDirManager() {
   Shutdown();
 }
 
+void DataDirManager::WaitOnClosures() {
+  for (const auto& dd : data_dirs_) {
+    dd->WaitOnClosures();
+  }
+}
+
 void DataDirManager::Shutdown() {
   // We may be waiting here for a while on outstanding closures.
   LOG_SLOW_EXECUTION(INFO, 1000,
@@ -295,34 +315,58 @@ void DataDirManager::Shutdown() {
   }
 }
 
-Status DataDirManager::Create(int flags) {
+Status DataDirManager::OpenExisting(Env* env, vector<string> data_fs_roots,
+                                    DataDirManagerOptions opts,
+                                    unique_ptr<DataDirManager>* dd_manager) {
+  unique_ptr<DataDirManager> dm;
+  dm.reset(new DataDirManager(env, std::move(opts), std::move(data_fs_roots)));
+  RETURN_NOT_OK(dm->Open());
+  dd_manager->swap(dm);
+  return Status::OK();
+}
+
+Status DataDirManager::CreateNew(Env* env, vector<string> data_fs_roots,
+                                 DataDirManagerOptions opts,
+                                 unique_ptr<DataDirManager>* dd_manager) {
+  unique_ptr<DataDirManager> dm;
+  dm.reset(new DataDirManager(env, std::move(opts), std::move(data_fs_roots)));
+  RETURN_NOT_OK(dm->Create());
+  RETURN_NOT_OK(dm->Open());
+  dd_manager->swap(dm);
+  return Status::OK();
+}
+
+Status DataDirManager::Create() {
+  CHECK(!read_only_);
+
   deque<ScopedFileDeleter*> delete_on_failure;
   ElementDeleter d(&delete_on_failure);
 
   // The UUIDs and indices will be included in every instance file.
   ObjectIdGenerator gen;
-  vector<string> all_uuids(paths_.size());
+  vector<string> all_uuids(canonicalized_data_fs_roots_.size());
   for (string& u : all_uuids) {
     u = gen.Next();
   }
   int idx = 0;
 
-  // Ensure the data paths exist and create the instance files.
+  // Ensure the data dirs exist and create the instance files.
   unordered_set<string> to_sync;
-  for (const auto& p : paths_) {
+  for (const string& root : canonicalized_data_fs_roots_) {
+    string data_dir = JoinPathSegments(root, kDataDirName);
     bool created;
-    RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(env_, p, &created),
-                          Substitute("Could not create directory $0", p));
+    RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(env_, data_dir, &created),
+        Substitute("Could not create directory $0", data_dir));
     if (created) {
-      delete_on_failure.push_front(new ScopedFileDeleter(env_, p));
-      to_sync.insert(DirName(p));
+      delete_on_failure.push_front(new ScopedFileDeleter(env_, data_dir));
+      to_sync.insert(root);
     }
 
-    if (flags & FLAG_CREATE_TEST_HOLE_PUNCH) {
-      RETURN_NOT_OK_PREPEND(CheckHolePunch(env_, p), kHolePunchErrorMsg);
+    if (block_manager_type_ == "log") {
+      RETURN_NOT_OK_PREPEND(CheckHolePunch(env_, data_dir), kHolePunchErrorMsg);
     }
 
-    string instance_filename = JoinPathSegments(p, kInstanceMetadataFileName);
+    string instance_filename = JoinPathSegments(data_dir, kInstanceMetadataFileName);
     PathInstanceMetadataFile metadata(env_, block_manager_type_,
                                       instance_filename);
     RETURN_NOT_OK_PREPEND(metadata.Create(all_uuids[idx], all_uuids), instance_filename);
@@ -331,11 +375,9 @@ Status DataDirManager::Create(int flags) {
   }
 
   // Ensure newly created directories are synchronized to disk.
-  if (flags & FLAG_CREATE_FSYNC) {
-    for (const string& dir : to_sync) {
-      RETURN_NOT_OK_PREPEND(env_->SyncDir(dir),
-                            Substitute("Unable to synchronize directory $0", dir));
-    }
+  for (const string& dir : to_sync) {
+    RETURN_NOT_OK_PREPEND(env_->SyncDir(dir),
+                          Substitute("Unable to synchronize directory $0", dir));
   }
 
   // Success: don't delete any files.
@@ -345,29 +387,40 @@ Status DataDirManager::Create(int flags) {
   return Status::OK();
 }
 
-Status DataDirManager::Open(int max_data_dirs, LockMode mode) {
+Status DataDirManager::Open() {
   vector<PathInstanceMetadataFile*> instances;
   vector<unique_ptr<DataDir>> dds;
+  LockMode lock_mode;
+  if (!FLAGS_fs_lock_data_dirs) {
+    lock_mode = LockMode::NONE;
+  } else if (read_only_) {
+    lock_mode = LockMode::OPTIONAL;
+  } else {
+    lock_mode = LockMode::MANDATORY;
+  }
+  int max_data_dirs = block_manager_type_ == "file" ? (1 << 16) - 1 : kuint32max;
 
   int i = 0;
-  for (const auto& p : paths_) {
+  // Create a directory for all data dirs specified by the user.
+  for (const auto& root : canonicalized_data_fs_roots_) {
+    string data_dir = JoinPathSegments(root, kDataDirName);
+    string instance_filename = JoinPathSegments(data_dir, kInstanceMetadataFileName);
     // Open and lock the data dir's metadata instance file.
-    string instance_filename = JoinPathSegments(p, kInstanceMetadataFileName);
     gscoped_ptr<PathInstanceMetadataFile> instance(
         new PathInstanceMetadataFile(env_, block_manager_type_,
                                      instance_filename));
     RETURN_NOT_OK_PREPEND(instance->LoadFromDisk(),
                           Substitute("Could not open $0", instance_filename));
-    if (mode != LockMode::NONE) {
+    if (lock_mode != LockMode::NONE) {
       Status s = instance->Lock();
       if (!s.ok()) {
         Status new_status = s.CloneAndPrepend(Substitute(
             "Could not lock $0", instance_filename));
-        if (mode == LockMode::OPTIONAL) {
+        if (lock_mode == LockMode::OPTIONAL) {
           LOG(WARNING) << new_status.ToString();
           LOG(WARNING) << "Proceeding without lock";
         } else {
-          DCHECK(LockMode::MANDATORY == mode);
+          DCHECK(LockMode::MANDATORY == lock_mode);
           RETURN_NOT_OK(new_status);
         }
       }
@@ -383,11 +436,11 @@ Status DataDirManager::Open(int max_data_dirs, LockMode mode) {
     // Figure out what filesystem the data directory is on.
     DataDirFsType fs_type = DataDirFsType::OTHER;
     bool result;
-    RETURN_NOT_OK(env_->IsOnExtFilesystem(p, &result));
+    RETURN_NOT_OK(env_->IsOnExtFilesystem(data_dir, &result));
     if (result) {
       fs_type = DataDirFsType::EXT;
     } else {
-      RETURN_NOT_OK(env_->IsOnXfsFilesystem(p, &result));
+      RETURN_NOT_OK(env_->IsOnXfsFilesystem(data_dir, &result));
       if (result) {
         fs_type = DataDirFsType::XFS;
       }
@@ -395,7 +448,7 @@ Status DataDirManager::Open(int max_data_dirs, LockMode mode) {
 
     // Create the data directory in-memory structure itself.
     unique_ptr<DataDir> dd(new DataDir(
-        env_, metrics_.get(), fs_type, p,
+        env_, metrics_.get(), fs_type, data_dir,
         unique_ptr<PathInstanceMetadataFile>(instance.release()),
         unique_ptr<ThreadPool>(pool.release())));
 
@@ -407,8 +460,8 @@ Status DataDirManager::Open(int max_data_dirs, LockMode mode) {
   }
 
   RETURN_NOT_OK_PREPEND(PathInstanceMetadataFile::CheckIntegrity(instances),
-                        Substitute("Could not verify integrity of files: $0",
-                                   JoinStrings(paths_, ",")));
+      Substitute("Could not verify integrity of files: $0",
+                 JoinStrings(JoinPathSegmentsV(canonicalized_data_fs_roots_, kDataDirName), ",")));
 
   // Use the per-dir thread pools to delete temporary files in parallel.
   for (const auto& dd : dds) {
@@ -423,6 +476,7 @@ Status DataDirManager::Open(int max_data_dirs, LockMode mode) {
   UuidIndexByUuidMap idx_by_uuid;
   UuidIndexMap dd_by_uuid_idx;
   ReverseUuidIndexMap uuid_idx_by_dd;
+  TabletsByUuidIndexMap tablets_by_uuid_idx_map;
   for (const auto& dd : dds) {
     const PathSetPB& path_set = dd->instance()->metadata()->path_set();
     uint32_t idx = -1;
@@ -441,14 +495,14 @@ Status DataDirManager::Open(int max_data_dirs, LockMode mode) {
     InsertOrDie(&idx_by_uuid, path_set.uuid(), idx);
     InsertOrDie(&dd_by_uuid_idx, idx, dd.get());
     InsertOrDie(&uuid_idx_by_dd, dd.get(), idx);
-    InsertOrDie(&tablets_by_uuid_idx_map_, idx, {});
+    InsertOrDie(&tablets_by_uuid_idx_map, idx, {});
   }
-
   data_dirs_.swap(dds);
   uuid_by_idx_.swap(uuid_by_idx);
   idx_by_uuid_.swap(idx_by_uuid);
   data_dir_by_uuid_idx_.swap(dd_by_uuid_idx);
   uuid_idx_by_data_dir_.swap(uuid_idx_by_dd);
+  tablets_by_uuid_idx_map_.swap(tablets_by_uuid_idx_map);
   return Status::OK();
 }
 
@@ -674,5 +728,13 @@ void DataDirManager::RemoveUnhealthyDataDirsUnlocked(const vector<uint16_t>& uui
   }
 }
 
+vector<string> DataDirManager::GetDataRoots() const {
+  return canonicalized_data_fs_roots_;
+}
+
+vector<string> DataDirManager::GetDataDirs() const {
+  return JoinPathSegmentsV(canonicalized_data_fs_roots_, kDataDirName);
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/data_dirs.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/data_dirs.h b/src/kudu/fs/data_dirs.h
index 4334e78..abdbcbd 100644
--- a/src/kudu/fs/data_dirs.h
+++ b/src/kudu/fs/data_dirs.h
@@ -185,14 +185,26 @@ class DataDir {
   DISALLOW_COPY_AND_ASSIGN(DataDir);
 };
 
+// Directory manager creation options.
+struct DataDirManagerOptions {
+  DataDirManagerOptions();
+
+  // The entity under which all metrics should be grouped. If null, metrics
+  // will not be produced.
+  //
+  // Defaults to null.
+  scoped_refptr<MetricEntity> metric_entity;
+
+  // Whether the directory manager should only allow reading.
+  //
+  // Defaults to false.
+  bool read_only;
+};
+
 // Encapsulates knowledge of data directory management on behalf of block
 // managers.
 class DataDirManager {
  public:
-  // Flags for Create().
-  static const int FLAG_CREATE_TEST_HOLE_PUNCH = 0x1;
-  static const int FLAG_CREATE_FSYNC = 0x2;
-
   enum class LockMode {
     MANDATORY,
     OPTIONAL,
@@ -204,25 +216,37 @@ class DataDirManager {
     USE_FLAG_SPEC,
   };
 
-  DataDirManager(Env* env,
-                 scoped_refptr<MetricEntity> metric_entity,
-                 std::string block_manager_type,
-                 std::vector<std::string> paths);
+  // Constructs a directory manager and creates its necessary files on-disk.
+  //
+  // Returns an error if any of the directories already exist.
+  static Status CreateNew(Env* env, std::vector<std::string> data_fs_roots,
+                          DataDirManagerOptions opts,
+                          std::unique_ptr<DataDirManager>* dd_manager);
+
+  // Constructs a directory manager and indexes the files found on-disk.
+  //
+  // Returns an error if the number of on-disk directories found exceeds the
+  // max allowed, or if locks need to be acquired and cannot be.
+  static Status OpenExisting(Env* env, std::vector<std::string> data_fs_roots,
+                             DataDirManagerOptions opts,
+                             std::unique_ptr<DataDirManager>* dd_manager);
+
   ~DataDirManager();
 
   // Shuts down all directories' thread pools.
   void Shutdown();
 
-  // Initializes the data directories on disk.
-  //
-  // Returns an error if initialized directories already exist.
-  Status Create(int flags);
+  // Waits on all directories' thread pools.
+  void WaitOnClosures();
 
-  // Opens existing data directories from disk.
-  //
-  // Returns an error if the number of on-disk data directories found exceeds
-  // 'max_data_dirs', or if 'mode' is MANDATORY and locks could not be taken.
-  Status Open(int max_data_dirs, LockMode mode);
+  // Returns a list of all data dirs.
+  const std::vector<std::unique_ptr<DataDir>>& data_dirs() const {
+    return data_dirs_;
+  }
+
+  // ==========================================================================
+  // Tablet Placement
+  // ==========================================================================
 
   // Deserializes a DataDirGroupPB and associates the resulting DataDirGroup
   // with a tablet_id.
@@ -258,29 +282,14 @@ class DataDirManager {
   // there is no room in the group, returns an error.
   Status GetNextDataDir(const CreateBlockOptions& opts, DataDir** dir);
 
-  // Finds a data directory by uuid index, returning nullptr if it can't be
-  // found.
-  //
-  // More information on uuid indexes and their relation to data directories
-  // can be found next to PathSetPB in fs.proto.
-  DataDir* FindDataDirByUuidIndex(uint16_t uuid_idx) const;
-
-  // Finds a uuid index by data directory, returning false if it can't be found.
-  bool FindUuidIndexByDataDir(DataDir* dir,
-                              uint16_t* uuid_idx) const;
-
-  // Finds a uuid index by UUID, returning false if it can't be found.
-  bool FindUuidIndexByUuid(const std::string& uuid, uint16_t* uuid_idx) const;
-
-  // Returns a list of all data dirs.
-  const std::vector<std::unique_ptr<DataDir>>& data_dirs() const {
-    return data_dirs_;
-  }
-
   // Finds the set of tablet_ids in the data dir specified by 'uuid_idx' and
   // returns a copy, returning an empty set if none are found.
   std::set<std::string> FindTabletsByDataDirUuidIdx(uint16_t uuid_idx);
 
+  // ==========================================================================
+  // Directory Health
+  // ==========================================================================
+
   // Adds 'uuid_idx' to the set of failed data directories. This directory will
   // no longer be used. Logs an error message prefixed with 'error_message'
   // describing what directories are affected.
@@ -294,6 +303,33 @@ class DataDirManager {
     return failed_data_dirs_;
   }
 
+  // ==========================================================================
+  // Directory Paths
+  // ==========================================================================
+
+  // Return a list of the canonicalized root directory names.
+  std::vector<std::string> GetDataRoots() const;
+
+  // Return a list of the canonicalized data directory names.
+  std::vector<std::string> GetDataDirs() const;
+
+  // ==========================================================================
+  // Representation Conversion
+  // ==========================================================================
+
+  // Finds a data directory by uuid index, returning nullptr if it can't be
+  // found.
+  //
+  // More information on uuid indexes and their relation to data directories
+  // can be found next to PathSetPB in fs.proto.
+  DataDir* FindDataDirByUuidIndex(uint16_t uuid_idx) const;
+
+  // Finds a uuid index by data directory, returning false if it can't be found.
+  bool FindUuidIndexByDataDir(DataDir* dir, uint16_t* uuid_idx) const;
+
+  // Finds a uuid index by UUID, returning false if it can't be found.
+  bool FindUuidIndexByUuid(const std::string& uuid, uint16_t* uuid_idx) const;
+
  private:
   FRIEND_TEST(DataDirGroupTest, TestCreateGroup);
   FRIEND_TEST(DataDirGroupTest, TestLoadFromPB);
@@ -301,6 +337,25 @@ class DataDirManager {
   FRIEND_TEST(DataDirGroupTest, TestLoadBalancingDistribution);
   FRIEND_TEST(DataDirGroupTest, TestFailedDirNotAddedToGroup);
 
+  // The base name of a data directory.
+  static const char* kDataDirName;
+
+  // Constructs a directory manager.
+  DataDirManager(Env* env,
+                 DataDirManagerOptions opts,
+                 std::vector<std::string> canonicalized_data_roots);
+
+  // Initializes the data directories on disk.
+  //
+  // Returns an error if initialized directories already exist.
+  Status Create();
+
+  // Opens existing data roots from disk and indexes the files found.
+  //
+  // Returns an error if the number of on-disk directories found exceeds the
+  // max allowed, or if locks need to be acquired and cannot be.
+  Status Open();
+
   // Repeatedly selects directories from those available to put into a new
   // DataDirGroup until 'group_indices' reaches 'target_size' elements.
   // Selection is based on "The Power of Two Choices in Randomized Load
@@ -322,7 +377,13 @@ class DataDirManager {
 
   Env* env_;
   const std::string block_manager_type_;
-  const std::vector<std::string> paths_;
+  const bool read_only_;
+
+  // The canonicalized roots provided to the constructor, taken verbatim.
+  //
+  // - The first data root is used as the metadata root.
+  // - Common roots in the collections have been deduplicated.
+  const std::vector<std::string> canonicalized_data_fs_roots_;
 
   std::unique_ptr<DataDirMetrics> metrics_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 3d5f8ef..10633e6 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -447,7 +447,7 @@ class FileReadableBlock : public ReadableBlock {
 };
 
 void FileReadableBlock::HandleError(const Status& s) const {
-  const DataDir* dir = block_manager_->dd_manager()->FindDataDirByUuidIndex(
+  const DataDir* dir = block_manager_->dd_manager_->FindDataDirByUuidIndex(
       internal::FileBlockLocation::GetDataDirIdx(block_id_));
   HANDLE_DISK_FAILURE(s, block_manager_->error_manager()->RunErrorNotificationCb(dir));
 }
@@ -525,9 +525,6 @@ size_t FileReadableBlock::memory_footprint() const {
 // FileBlockManager
 ////////////////////////////////////////////////////////////
 
-static const char* kBlockManagerType = "file";
-static const int kMaxPaths = (1 << 16) - 1;
-
 Status FileBlockManager::SyncMetadata(const internal::FileBlockLocation& location) {
   vector<string> parent_dirs;
   location.GetAllParentDirs(&parent_dirs);
@@ -555,7 +552,7 @@ Status FileBlockManager::SyncMetadata(const internal::FileBlockLocation& locatio
 
 bool FileBlockManager::FindBlockPath(const BlockId& block_id,
                                      string* path) const {
-  DataDir* dir = dd_manager_.FindDataDirByUuidIndex(
+  DataDir* dir = dd_manager_->FindDataDirByUuidIndex(
       internal::FileBlockLocation::GetDataDirIdx(block_id));
   if (dir) {
     *path = internal::FileBlockLocation::FromBlockId(
@@ -565,11 +562,12 @@ bool FileBlockManager::FindBlockPath(const BlockId& block_id,
 }
 
 FileBlockManager::FileBlockManager(Env* env,
+                                   DataDirManager* dd_manager,
                                    FsErrorManager* error_manager,
                                    const BlockManagerOptions& opts)
   : env_(DCHECK_NOTNULL(env)),
     read_only_(opts.read_only),
-    dd_manager_(env, opts.metric_entity, kBlockManagerType, opts.root_paths),
+    dd_manager_(dd_manager),
     error_manager_(DCHECK_NOTNULL(error_manager)),
     file_cache_("fbm", env_, GetFileCacheCapacityForBlockManager(env_),
                 opts.metric_entity),
@@ -586,28 +584,12 @@ FileBlockManager::FileBlockManager(Env* env,
 FileBlockManager::~FileBlockManager() {
 }
 
-Status FileBlockManager::Create() {
-  CHECK(!read_only_);
-  return dd_manager_.Create(
-      FLAGS_enable_data_block_fsync ? DataDirManager::FLAG_CREATE_FSYNC : 0);
-}
-
 Status FileBlockManager::Open(FsReport* report) {
-  DataDirManager::LockMode mode;
-  if (!FLAGS_block_manager_lock_dirs) {
-    mode = DataDirManager::LockMode::NONE;
-  } else if (read_only_) {
-    mode = DataDirManager::LockMode::OPTIONAL;
-  } else {
-    mode = DataDirManager::LockMode::MANDATORY;
-  }
-  RETURN_NOT_OK(dd_manager_.Open(kMaxPaths, mode));
-
   RETURN_NOT_OK(file_cache_.Init());
 
   // Prepare the filesystem report and either return or log it.
   FsReport local_report;
-  for (const auto& dd : dd_manager_.data_dirs()) {
+  for (const auto& dd : dd_manager_->data_dirs()) {
     // TODO(adar): probably too expensive to fill out the stats/checks.
     local_report.data_dirs.push_back(dd->dir());
   }
@@ -624,9 +606,9 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
   CHECK(!read_only_);
 
   DataDir* dir;
-  RETURN_NOT_OK(dd_manager_.GetNextDataDir(opts, &dir));
+  RETURN_NOT_OK(dd_manager_->GetNextDataDir(opts, &dir));
   uint16_t uuid_idx;
-  CHECK(dd_manager_.FindUuidIndexByDataDir(dir, &uuid_idx));
+  CHECK(dd_manager_->FindUuidIndexByDataDir(dir, &uuid_idx));
 
   string path;
   vector<string> created_dirs;
@@ -686,7 +668,7 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
 
 #define RETURN_NOT_OK_FBM_DISK_FAILURE(status_expr) do { \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
-      error_manager_->RunErrorNotificationCb(dd_manager()->FindDataDirByUuidIndex( \
+      error_manager_->RunErrorNotificationCb(dd_manager_->FindDataDirByUuidIndex( \
       internal::FileBlockLocation::GetDataDirIdx(block_id)))); \
 } while (0);
 
@@ -789,7 +771,7 @@ void GetAllBlockIdsForDataDir(Env* env,
 } // anonymous namespace
 
 Status FileBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) {
-  const auto& dds = dd_manager_.data_dirs();
+  const auto& dds = dd_manager_->data_dirs();
   block_ids->clear();
 
   // The FBM does not maintain block listings in memory, so off we go to the
@@ -803,7 +785,7 @@ Status FileBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) {
                              &block_id_vecs[i],
                              &statuses[i]));
   }
-  for (const auto& dd : dd_manager_.data_dirs()) {
+  for (const auto& dd : dd_manager_->data_dirs()) {
     dd->WaitOnClosures();
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/file_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index afd02c7..f62c900 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -70,15 +70,14 @@ struct BlockManagerMetrics;
 // The file-backed block manager.
 class FileBlockManager : public BlockManager {
  public:
-
-  // Note: 'env' and 'error_manager' should remain alive for the lifetime of
-  // the block manager.
-  FileBlockManager(Env* env, FsErrorManager* error_manager, const BlockManagerOptions& opts);
+  // Note: all objects passed as pointers should remain alive for the lifetime
+  // of the block manager.
+  FileBlockManager(Env* env, DataDirManager* dd_manager,
+                   FsErrorManager* error_manager,
+                   const BlockManagerOptions& opts);
 
   virtual ~FileBlockManager();
 
-  Status Create() override;
-
   Status Open(FsReport* report) override;
 
   Status CreateBlock(const CreateBlockOptions& opts,
@@ -93,8 +92,6 @@ class FileBlockManager : public BlockManager {
 
   Status GetAllBlockIds(std::vector<BlockId>* block_ids) override;
 
-  DataDirManager* dd_manager() override { return &dd_manager_; }
-
   FsErrorManager* error_manager() override { return error_manager_; }
 
  private:
@@ -118,8 +115,9 @@ class FileBlockManager : public BlockManager {
   // If true, only read operations are allowed.
   const bool read_only_;
 
-  // Manages and owns all of the block manager's data directories.
-  DataDirManager dd_manager_;
+  // Manages and owns the data directories in which the block manager will
+  // place its blocks.
+  DataDirManager* dd_manager_;
 
   // Manages callbacks used to handle disk failure.
   FsErrorManager* error_manager_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/fs_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager-test.cc b/src/kudu/fs/fs_manager-test.cc
index f0e34e0..34bbe36 100644
--- a/src/kudu/fs/fs_manager-test.cc
+++ b/src/kudu/fs/fs_manager-test.cc
@@ -297,6 +297,7 @@ TEST_F(FsManagerTestBase, TestTmpFilesCleanup) {
   ASSERT_EQ(6, n_tmp_files);
 
   // Opening fs_manager should remove tmp files
+  ReinitFsManager(wal_path, data_paths);
   ASSERT_OK(fs_manager()->Open());
 
   n_tmp_files = 0;

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/fs_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 5859476..d534d94 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -65,10 +65,17 @@ TAG_FLAG(enable_data_block_fsync, unsafe);
 #if defined(__linux__)
 DEFINE_string(block_manager, "log", "Which block manager to use for storage. "
               "Valid options are 'file' and 'log'.");
+static bool ValidateBlockManagerType(const char* /*flagname*/, const std::string& value) {
+  return value == "log" || value == "file";
+}
 #else
 DEFINE_string(block_manager, "file", "Which block manager to use for storage. "
               "Only the file block manager is supported for non-Linux systems.");
+static bool ValidateBlockManagerType(const char* /*flagname*/, const std::string& value) {
+  return value == "file";
+}
 #endif
+DEFINE_validator(block_manager, &ValidateBlockManagerType);
 TAG_FLAG(block_manager, advanced);
 
 DEFINE_string(fs_wal_dir, "",
@@ -85,6 +92,7 @@ using kudu::env_util::ScopedFileDeleter;
 using kudu::fs::BlockManagerOptions;
 using kudu::fs::CreateBlockOptions;
 using kudu::fs::DataDirManager;
+using kudu::fs::DataDirManagerOptions;
 using kudu::fs::FsErrorManager;
 using kudu::fs::FileBlockManager;
 using kudu::fs::FsReport;
@@ -228,9 +236,6 @@ Status FsManager::Init() {
     VLOG(1) << "All roots: " << canonicalized_all_fs_roots_;
   }
 
-  // With the data roots canonicalized, we can initialize the block manager.
-  InitBlockManager();
-
   initted_ = true;
   return Status::OK();
 }
@@ -239,14 +244,11 @@ void FsManager::InitBlockManager() {
   BlockManagerOptions opts;
   opts.metric_entity = metric_entity_;
   opts.parent_mem_tracker = parent_mem_tracker_;
-  opts.root_paths = GetDataRootDirs();
   opts.read_only = read_only_;
   if (FLAGS_block_manager == "file") {
-    block_manager_.reset(new FileBlockManager(env_, error_manager_.get(), opts));
-  } else if (FLAGS_block_manager == "log") {
-    block_manager_.reset(new LogBlockManager(env_, error_manager_.get(), opts));
+    block_manager_.reset(new FileBlockManager(env_, dd_manager_.get(), error_manager_.get(), opts));
   } else {
-    LOG(FATAL) << "Invalid block manager: " << FLAGS_block_manager;
+    block_manager_.reset(new LogBlockManager(env_, dd_manager_.get(), error_manager_.get(), opts));
   }
 }
 
@@ -279,9 +281,25 @@ Status FsManager::Open(FsReport* report) {
     CheckAndFixPermissions();
   }
 
+  // Open the directory manager if it has not been opened already.
+  if (!dd_manager_) {
+    DataDirManagerOptions dm_opts;
+    dm_opts.metric_entity = metric_entity_;
+    dm_opts.read_only = read_only_;
+    vector<string> canonicalized_data_roots(canonicalized_data_fs_roots_.begin(),
+                                            canonicalized_data_fs_roots_.end());
+    LOG_TIMING(INFO, "opening directory manager") {
+      RETURN_NOT_OK(DataDirManager::OpenExisting(env_,
+          canonicalized_data_roots, dm_opts, &dd_manager_));
+    }
+  }
+
+  // Finally, initialize and open the block manager.
+  InitBlockManager();
   LOG_TIMING(INFO, "opening block manager") {
     RETURN_NOT_OK(block_manager_->Open(report));
   }
+
   LOG(INFO) << "Opened local filesystem: " << JoinStrings(canonicalized_all_fs_roots_, ",")
             << std::endl << SecureDebugString(*metadata_);
   return Status::OK();
@@ -352,8 +370,16 @@ Status FsManager::CreateInitialFileSystemLayout(boost::optional<string> uuid) {
     }
   }
 
-  // And lastly, the block manager.
-  RETURN_NOT_OK_PREPEND(block_manager_->Create(), "Unable to create block manager");
+  // And lastly, create the directory manager.
+  DataDirManagerOptions opts;
+  opts.metric_entity = metric_entity_;
+  opts.read_only = read_only_;
+  vector<string> canonicalized_data_roots(canonicalized_data_fs_roots_.begin(),
+                                          canonicalized_data_fs_roots_.end());
+  LOG_TIMING(INFO, "creating directory manager") {
+    RETURN_NOT_OK_PREPEND(DataDirManager::CreateNew(env_,
+        canonicalized_data_roots, opts, &dd_manager_), "Unable to create directory manager");
+  }
 
   // Success: don't delete any files.
   for (ScopedFileDeleter* deleter : delete_on_failure) {
@@ -422,16 +448,8 @@ const string& FsManager::uuid() const {
 }
 
 vector<string> FsManager::GetDataRootDirs() const {
-  // Add the data subdirectory to each data root.
-  vector<string> data_paths;
-  for (const string& data_fs_root : canonicalized_data_fs_roots_) {
-    data_paths.push_back(JoinPathSegments(data_fs_root, kDataDirName));
-  }
-  return data_paths;
-}
-
-DataDirManager* FsManager::dd_manager() const {
-  return block_manager_->dd_manager();
+  // Get the data subdirectory for each data root.
+  return dd_manager_->GetDataDirs();
 }
 
 string FsManager::GetTabletMetadataDir() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/fs_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index c5899f4..9b53794 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef KUDU_FS_FS_MANAGER_H
-#define KUDU_FS_FS_MANAGER_H
+#pragma once
 
 #include <cstddef>
 #include <cstdint>
@@ -118,7 +117,7 @@ class FsManager {
   //
   // If the filesystem has not been initialized, returns NotFound. In that
   // case, CreateInitialFileSystemLayout() may be used to initialize the
-  // on-disk structures.
+  // on-disk and in-memory structures.
   Status Open(fs::FsReport* report = nullptr);
 
   // Registers an error-handling callback with the FsErrorManager.
@@ -224,7 +223,9 @@ class FsManager {
 
   Status CreateDirIfMissing(const std::string& path, bool* created = NULL);
 
-  fs::DataDirManager* dd_manager() const;
+  fs::DataDirManager* dd_manager() const {
+    return dd_manager_.get();
+  }
 
   fs::BlockManager* block_manager() {
     return block_manager_.get();
@@ -310,6 +311,7 @@ class FsManager {
 
   std::unique_ptr<InstanceMetadataPB> metadata_;
 
+  std::unique_ptr<fs::DataDirManager> dd_manager_;
   std::unique_ptr<fs::BlockManager> block_manager_;
   std::unique_ptr<fs::FsErrorManager> error_manager_;
 
@@ -320,4 +322,3 @@ class FsManager {
 
 } // namespace kudu
 
-#endif

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index d022ff7..c06d540 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -22,6 +22,7 @@
 #include <unordered_set>
 #include <vector>
 
+#include "kudu/fs/data_dirs.h"
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_report.h"
@@ -73,27 +74,38 @@ class LogBlockManagerTest : public KuduTest {
   }
 
   void SetUp() override {
-    CHECK_OK(bm_->Create());
-
     // Pass in a report to prevent the block manager from logging unnecessarily.
     FsReport report;
-    CHECK_OK(bm_->Open(&report));
-    CHECK_OK(bm_->dd_manager()->CreateDataDirGroup(test_tablet_name_));
-    CHECK(bm_->dd_manager()->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
+    ASSERT_OK(bm_->Open(&report));
+    ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
+    ASSERT_TRUE(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
   }
 
  protected:
   LogBlockManager* CreateBlockManager(const scoped_refptr<MetricEntity>& metric_entity) {
+    if (!dd_manager_) {
+      // Ensure the directory manager is initialized.
+      CHECK_OK(DataDirManager::CreateNew(env_, { test_dir_ },
+          DataDirManagerOptions(), &dd_manager_));
+    }
     BlockManagerOptions opts;
     opts.metric_entity = metric_entity;
-    opts.root_paths = { test_dir_ };
-    return new LogBlockManager(env_, test_error_manager_.get(), opts);
+    return new LogBlockManager(env_, dd_manager_.get(), test_error_manager_.get(), opts);
   }
 
-  Status ReopenBlockManager(FsReport* report = nullptr) {
-    bm_.reset(CreateBlockManager(scoped_refptr<MetricEntity>()));
+  Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity = nullptr,
+                            FsReport* report = nullptr) {
+    // The directory manager must outlive the block manager. Destroy the block
+    // manager first to enforce this.
+    bm_.reset();
+
+    // Re-open the directory manager first to clear any in-memory maps.
+    RETURN_NOT_OK(DataDirManager::OpenExisting(env_, { test_dir_ },
+        DataDirManagerOptions(), &dd_manager_));
+
+    bm_.reset(CreateBlockManager(metric_entity));
     RETURN_NOT_OK(bm_->Open(report));
-    RETURN_NOT_OK(bm_->dd_manager()->LoadDataDirGroupFromPB(test_tablet_name_, test_group_pb_));
+    RETURN_NOT_OK(dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, test_group_pb_));
     return Status::OK();
   }
 
@@ -149,6 +161,7 @@ class LogBlockManagerTest : public KuduTest {
   string test_tablet_name_;
   CreateBlockOptions test_block_opts_;
 
+  unique_ptr<DataDirManager> dd_manager_;
   unique_ptr<FsErrorManager> test_error_manager_;
   unique_ptr<LogBlockManager> bm_;
 
@@ -162,13 +175,15 @@ class LogBlockManagerTest : public KuduTest {
     // Populate 'data_files' and 'metadata_files'.
     vector<string> data_files;
     vector<string> metadata_files;
-    vector<string> children;
-    ASSERT_OK(env_->GetChildren(GetTestDataDirectory(), &children));
-    for (const string& child : children) {
-      if (HasSuffixString(child, LogBlockManager::kContainerDataFileSuffix)) {
-        data_files.push_back(JoinPathSegments(GetTestDataDirectory(), child));
-      } else if (HasSuffixString(child, LogBlockManager::kContainerMetadataFileSuffix)) {
-        metadata_files.push_back(JoinPathSegments(GetTestDataDirectory(), child));
+    for (const string& data_dir : dd_manager_->GetDataDirs()) {
+      vector<string> children;
+      ASSERT_OK(env_->GetChildren(data_dir, &children));
+      for (const string& child : children) {
+        if (HasSuffixString(child, LogBlockManager::kContainerDataFileSuffix)) {
+          data_files.push_back(JoinPathSegments(data_dir, child));
+        } else if (HasSuffixString(child, LogBlockManager::kContainerMetadataFileSuffix)) {
+          metadata_files.push_back(JoinPathSegments(data_dir, child));
+        }
       }
     }
 
@@ -221,9 +236,7 @@ static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity,
 TEST_F(LogBlockManagerTest, MetricsTest) {
   MetricRegistry registry;
   scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
-  bm_.reset(CreateBlockManager(entity));
-  ASSERT_OK(bm_->Open(nullptr));
-  ASSERT_OK(bm_->dd_manager()->LoadDataDirGroupFromPB(test_tablet_name_, test_group_pb_));
+  ASSERT_OK(ReopenBlockManager(entity));
   ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 0, 0));
 
   // Lower the max container size so that we can more easily test full
@@ -269,8 +282,7 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
   // persistent information so they should be the same.
   MetricRegistry new_registry;
   scoped_refptr<MetricEntity> new_entity = METRIC_ENTITY_server.Instantiate(&new_registry, "test");
-  bm_.reset(CreateBlockManager(new_entity));
-  ASSERT_OK(bm_->Open(nullptr));
+  ASSERT_OK(ReopenBlockManager(new_entity));
   ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 10 * 1024, 11, 10, 10));
 
   // Delete a block. Its contents should no longer be under management.
@@ -796,7 +808,7 @@ TEST_F(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
   NO_FATALS(GetOnlyContainer(&container_name));
 
   // Add a mixture of regular and misaligned blocks to it.
-  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   int num_misaligned_blocks = 0;
   for (int i = 0; i < kNumBlocks; i++) {
@@ -807,7 +819,7 @@ TEST_F(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
       // container metadata writers do not expect the metadata files to have
       // been changed underneath them.
       FsReport report;
-      ASSERT_OK(ReopenBlockManager(&report));
+      ASSERT_OK(ReopenBlockManager(nullptr, &report));
       ASSERT_FALSE(report.HasFatalErrors());
       num_misaligned_blocks++;
     } else {
@@ -834,7 +846,7 @@ TEST_F(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
     }
   }
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors()) << report.ToString();
   ASSERT_EQ(num_misaligned_blocks, report.misaligned_block_check->entries.size());
   for (const auto& mb : report.misaligned_block_check->entries) {
@@ -854,7 +866,7 @@ TEST_F(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
   // do this by reopening it; shutdown will wait for outstanding hole punches.
   //
   // On reopen, some misaligned blocks should be gone from the report.
-  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_GT(report.misaligned_block_check->entries.size(), 0);
   ASSERT_LT(report.misaligned_block_check->entries.size(), num_misaligned_blocks);
@@ -908,13 +920,13 @@ TEST_F(LogBlockManagerTest, TestRepairPreallocateExcessSpace) {
   NO_FATALS(GetContainerNames(&container_names));
 
   // Corrupt one container.
-  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   ASSERT_OK(corruptor.PreallocateFullContainer());
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(1, report.full_container_space_check->entries.size());
   const LBMFullContainerSpaceCheck::Entry& fcs =
@@ -953,7 +965,7 @@ TEST_F(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
   ASSERT_EQ(0, file_size_on_disk);
 
   // Add some "unpunched blocks" to the container.
-  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   for (int i = 0; i < kNumBlocks; i++) {
     ASSERT_OK(corruptor.AddUnpunchedBlockToFullContainer());
@@ -964,7 +976,7 @@ TEST_F(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(1, report.full_container_space_check->entries.size());
   const LBMFullContainerSpaceCheck::Entry& fcs =
@@ -980,7 +992,7 @@ TEST_F(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
   // Wait for the block manager to punch out all of the holes (done as part of
   // repair at startup). It's easiest to do this by reopening it; shutdown will
   // wait for outstanding hole punches.
-  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   NO_FATALS(AssertEmptyReport(report));
 
   // File size should be 0 post-repair.
@@ -994,7 +1006,7 @@ TEST_F(LogBlockManagerTest, TestRepairIncompleteContainer) {
   // Create some incomplete containers. The corruptor will select between
   // several variants of "incompleteness" at random (see
   // LBMCorruptor::CreateIncompleteContainer() for details).
-  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   for (int i = 0; i < kNumContainers; i++) {
     ASSERT_OK(corruptor.CreateIncompleteContainer());
@@ -1005,7 +1017,7 @@ TEST_F(LogBlockManagerTest, TestRepairIncompleteContainer) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(kNumContainers, report.incomplete_container_check->entries.size());
   unordered_set<string> container_name_set(container_names.begin(),
@@ -1032,7 +1044,7 @@ TEST_F(LogBlockManagerTest, TestDetectMalformedRecords) {
   // Add some malformed records. The corruptor will select between
   // several variants of "malformedness" at random (see
   // LBMCorruptor::AddMalformedRecordToContainer for details).
-  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   for (int i = 0; i < kNumRecords; i++) {
     ASSERT_OK(corruptor.AddMalformedRecordToContainer());
@@ -1040,7 +1052,7 @@ TEST_F(LogBlockManagerTest, TestDetectMalformedRecords) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_TRUE(report.HasFatalErrors());
   ASSERT_EQ(kNumRecords, report.malformed_record_check->entries.size());
   for (const auto& mr : report.malformed_record_check->entries) {
@@ -1062,7 +1074,7 @@ TEST_F(LogBlockManagerTest, TestDetectMisalignedBlocks) {
   NO_FATALS(GetOnlyContainer(&container_name));
 
   // Add some misaligned blocks.
-  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   for (int i = 0; i < kNumBlocks; i++) {
     ASSERT_OK(corruptor.AddMisalignedBlockToContainer());
@@ -1070,7 +1082,7 @@ TEST_F(LogBlockManagerTest, TestDetectMisalignedBlocks) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(kNumBlocks, report.misaligned_block_check->entries.size());
   uint64_t fs_block_size;
@@ -1101,7 +1113,7 @@ TEST_F(LogBlockManagerTest, TestRepairPartialRecords) {
   ASSERT_EQ(kNumContainers, container_names.size());
 
   // Add some partial records.
-  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  LBMCorruptor corruptor(env_, dd_manager_->GetDataDirs(), SeedRandom());
   ASSERT_OK(corruptor.Init());
   for (int i = 0; i < kNumRecords; i++) {
     ASSERT_OK(corruptor.AddPartialRecordToContainer());
@@ -1109,7 +1121,7 @@ TEST_F(LogBlockManagerTest, TestRepairPartialRecords) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(kNumRecords, report.partial_record_check->entries.size());
   unordered_set<string> container_name_set(container_names.begin(),
@@ -1182,7 +1194,7 @@ TEST_F(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
     ASSERT_OK(bm_->DeleteBlock(id));
     num_blocks_deleted++;
     FsReport report;
-    ASSERT_OK(ReopenBlockManager(&report));
+    ASSERT_OK(ReopenBlockManager(nullptr, &report));
     last_live_aligned_bytes = report.stats.live_block_bytes_aligned;
 
     ASSERT_OK(env_->GetFileSize(metadata_file_name, &post_compaction_file_size));
@@ -1200,7 +1212,7 @@ TEST_F(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
   // dead blocks that were removed) shouldn't affect the number of live bytes
   // post-alignment.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_EQ(last_live_aligned_bytes, report.stats.live_block_bytes_aligned);
 }
 
@@ -1221,7 +1233,7 @@ TEST_F(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
   // consume a lot of file descriptors).
   FLAGS_log_container_max_blocks = 4;
   // Reopen so the flags take effect.
-  ASSERT_OK(ReopenBlockManager(nullptr));
+  ASSERT_OK(ReopenBlockManager());
 
   // Create many container with a bunch of blocks, half of which are deleted.
   vector<BlockId> block_ids;
@@ -1240,7 +1252,7 @@ TEST_F(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
   // files, since we've deleted half the blocks in every container and the
   // threshold is set high above.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
 
   // Delete the remaining blocks in a random order. This will append to metadata
   // files which have just been compacted. Since we have more metadata files than
@@ -1253,7 +1265,7 @@ TEST_F(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
 
   // Reopen to make sure that the metadata can be properly loaded and
   // that the resulting block manager is empty.
-  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_EQ(0, report.stats.live_block_count);
   ASSERT_EQ(0, report.stats.live_block_bytes_aligned);
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 273a18c..415248d 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -1476,8 +1476,6 @@ size_t LogReadableBlock::memory_footprint() const {
 const char* LogBlockManager::kContainerMetadataFileSuffix = ".metadata";
 const char* LogBlockManager::kContainerDataFileSuffix = ".data";
 
-static const char* kBlockManagerType = "log";
-
 // These values were arrived at via experimentation. See commit 4923a74 for
 // more details.
 const map<int64_t, int64_t> LogBlockManager::kPerFsBlockSizeBlockLimits({
@@ -1486,12 +1484,13 @@ const map<int64_t, int64_t> LogBlockManager::kPerFsBlockSizeBlockLimits({
   { 4096, 2721 }});
 
 LogBlockManager::LogBlockManager(Env* env,
+                                 DataDirManager* dd_manager,
                                  FsErrorManager* error_manager,
                                  const BlockManagerOptions& opts)
   : mem_tracker_(MemTracker::CreateTracker(-1,
                                            "log_block_manager",
                                            opts.parent_mem_tracker)),
-    dd_manager_(env, opts.metric_entity, kBlockManagerType, opts.root_paths),
+    dd_manager_(DCHECK_NOTNULL(dd_manager)),
     error_manager_(DCHECK_NOTNULL(error_manager)),
     file_cache_("lbm", env, GetFileCacheCapacityForBlockManager(env),
                 opts.metric_entity),
@@ -1534,38 +1533,20 @@ LogBlockManager::~LogBlockManager() {
   // destroyed before their containers.
   blocks_by_block_id_.clear();
 
-  // Containers may have outstanding tasks running on data directories; shut
-  // them down before destroying the containers.
-  dd_manager_.Shutdown();
+  // Containers may have outstanding tasks running on data directories; wait
+  // for them to complete before destroying the containers.
+  dd_manager_->WaitOnClosures();
 
   STLDeleteValues(&all_containers_by_name_);
 }
 
-Status LogBlockManager::Create() {
-  CHECK(!read_only_);
-  return dd_manager_.Create(FLAGS_enable_data_block_fsync ?
-      DataDirManager::FLAG_CREATE_TEST_HOLE_PUNCH | DataDirManager::FLAG_CREATE_FSYNC :
-      DataDirManager::FLAG_CREATE_TEST_HOLE_PUNCH);
-}
-
 Status LogBlockManager::Open(FsReport* report) {
-  DataDirManager::LockMode mode;
-  if (!FLAGS_block_manager_lock_dirs) {
-    mode = DataDirManager::LockMode::NONE;
-  } else if (read_only_) {
-    mode = DataDirManager::LockMode::OPTIONAL;
-  } else {
-    mode = DataDirManager::LockMode::MANDATORY;
-  }
-  RETURN_NOT_OK(dd_manager_.Open(kuint32max, mode));
-
   RETURN_NOT_OK(file_cache_.Init());
 
   // Establish (and log) block limits for each data directory using kernel,
   // filesystem, and gflags information.
-  for (const auto& dd : dd_manager_.data_dirs()) {
+  for (const auto& dd : dd_manager_->data_dirs()) {
     boost::optional<int64_t> limit;
-
     if (FLAGS_log_container_max_blocks == -1) {
       // No limit, unless this is KUDU-1508.
 
@@ -1601,10 +1582,10 @@ Status LogBlockManager::Open(FsReport* report) {
     InsertOrDie(&block_limits_by_data_dir_, dd.get(), limit);
   }
 
-  vector<FsReport> reports(dd_manager_.data_dirs().size());
-  vector<Status> statuses(dd_manager_.data_dirs().size());
+  vector<FsReport> reports(dd_manager_->data_dirs().size());
+  vector<Status> statuses(dd_manager_->data_dirs().size());
   int i = 0;
-  for (const auto& dd : dd_manager_.data_dirs()) {
+  for (const auto& dd : dd_manager_->data_dirs()) {
     // Open the data dir asynchronously.
     dd->ExecClosure(
         Bind(&LogBlockManager::OpenDataDir,
@@ -1616,10 +1597,10 @@ Status LogBlockManager::Open(FsReport* report) {
   }
 
   // Wait for the opens to complete.
-  for (const auto& dd : dd_manager_.data_dirs()) {
+  for (const auto& dd : dd_manager_->data_dirs()) {
     dd->WaitOnClosures();
   }
-  if (dd_manager_.GetFailedDataDirs().size() == dd_manager_.data_dirs().size()) {
+  if (dd_manager_->GetFailedDataDirs().size() == dd_manager_->data_dirs().size()) {
     return Status::IOError("All data dirs failed to open", "", EIO);
   }
 
@@ -1638,7 +1619,7 @@ Status LogBlockManager::Open(FsReport* report) {
       return s;
     }
     LOG(ERROR) << Substitute("Not using report from $0: $1",
-        dd_manager_.data_dirs()[i]->dir(), s.ToString());
+        dd_manager_->data_dirs()[i]->dir(), s.ToString());
   }
 
   // Either return or log the report.
@@ -1792,7 +1773,7 @@ void LogBlockManager::RemoveFullContainerUnlocked(const string& container_name)
 Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
                                              LogBlockContainer** container) {
   DataDir* dir;
-  RETURN_NOT_OK(dd_manager_.GetNextDataDir(opts, &dir));
+  RETURN_NOT_OK(dd_manager_->GetNextDataDir(opts, &dir));
 
   {
     std::lock_guard<simple_spinlock> l(lock_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 3c52602..1474cee 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -162,14 +162,14 @@ class LogBlockManager : public BlockManager {
   static const char* kContainerMetadataFileSuffix;
   static const char* kContainerDataFileSuffix;
 
-  // Note: 'env' and 'error_manager' should remain alive for the lifetime of
-  // the block manager.
-  LogBlockManager(Env* env, FsErrorManager* error_manager, const BlockManagerOptions& opts);
+  // Note: all objects passed as pointers should remain alive for the lifetime
+  // of the block manager.
+  LogBlockManager(Env* env, DataDirManager* dd_manager,
+                  FsErrorManager* error_manager,
+                  const BlockManagerOptions& opts);
 
   virtual ~LogBlockManager();
 
-  Status Create() override;
-
   Status Open(FsReport* report) override;
 
   Status CreateBlock(const CreateBlockOptions& opts,
@@ -184,8 +184,6 @@ class LogBlockManager : public BlockManager {
 
   Status GetAllBlockIds(std::vector<BlockId>* block_ids) override;
 
-  DataDirManager* dd_manager() override { return &dd_manager_; }
-
   FsErrorManager* error_manager() override { return error_manager_; }
 
  private:
@@ -354,8 +352,9 @@ class LogBlockManager : public BlockManager {
   // Protects the block map, container structures, and 'dirty_dirs'.
   mutable simple_spinlock lock_;
 
-  // Manages and owns all of the block manager's data directories.
-  DataDirManager dd_manager_;
+  // Manages and owns the data directories in which the block manager will
+  // place its blocks.
+  DataDirManager* dd_manager_;
 
   // Manages callbacks used to handle disk failure.
   FsErrorManager* error_manager_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/util/path_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/path_util.cc b/src/kudu/util/path_util.cc
index 201bac6..819cc40 100644
--- a/src/kudu/util/path_util.cc
+++ b/src/kudu/util/path_util.cc
@@ -44,8 +44,7 @@ namespace kudu {
 const char kTmpInfix[] = ".kudutmp";
 const char kOldTmpInfix[] = ".tmp";
 
-std::string JoinPathSegments(const std::string &a,
-                             const std::string &b) {
+string JoinPathSegments(const string& a, const string& b) {
   CHECK(!a.empty()) << "empty first component: " << a;
   CHECK(!b.empty() && b[0] != '/')
     << "second path component must be non-empty and relative: "
@@ -57,6 +56,14 @@ std::string JoinPathSegments(const std::string &a,
   }
 }
 
+vector<string> JoinPathSegmentsV(const vector<string>& v, const string& s) {
+  vector<string> out;
+  for (const string& path : v) {
+    out.emplace_back(JoinPathSegments(path, s));
+  }
+  return out;
+}
+
 vector<string> SplitPath(const string& path) {
   if (path.empty()) return {};
   vector<string> segments;

http://git-wip-us.apache.org/repos/asf/kudu/blob/61a22773/src/kudu/util/path_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/path_util.h b/src/kudu/util/path_util.h
index bb5c631..52501b0 100644
--- a/src/kudu/util/path_util.h
+++ b/src/kudu/util/path_util.h
@@ -31,8 +31,12 @@ extern const char kOldTmpInfix[];
 
 // Join two path segments with the appropriate path separator,
 // if necessary.
-std::string JoinPathSegments(const std::string &a,
-                             const std::string &b);
+std::string JoinPathSegments(const std::string& a,
+                             const std::string& b);
+
+// Join each path segment in a list with a common suffix segment.
+std::vector<std::string> JoinPathSegmentsV(const std::vector<std::string>& v,
+                                           const std::string& s);
 
 // Split a path into segments with the appropriate path separator.
 std::vector<std::string> SplitPath(const std::string& path);


Mime
View raw message