kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject [2/2] kudu git commit: KUDU-1726: Avoid fsync-per-block in tablet copy
Date Fri, 25 Aug 2017 02:17:49 GMT
KUDU-1726: Avoid fsync-per-block in tablet copy

This patch incorporates BlockTransaction API into tablet
copy, to avoid fsync() on each block during copying. Blocks
are synced to disk together once the tablet copy is complete.
It also introduces a new block manager metric 'total_disk_sync'
to keep track of block data disk synchronization count.

I did a manual test to copy tablet with size of ~37GB from one
tablet server to another on el6 with ext4. Each tablet server
has its own dedicated single disk.
'kudu remote_replica copy c53ffbc2ede84b6d9af2da607024d131
localhost:3334 localhost:3335'

With this change, the operation time dropped down from ~718s
to ~523s.

Change-Id: I7534699f589c7060ffe32d7ac67546476cf21e76
Reviewed-on: http://gerrit.cloudera.org:8080/7701
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3d42a264
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3d42a264
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3d42a264

Branch: refs/heads/master
Commit: 3d42a264c15ded1001284a851b7f30a2b4465411
Parents: 59ca360
Author: hahao <hao.hao@cloudera.com>
Authored: Thu Aug 24 15:52:08 2017 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Fri Aug 25 02:16:43 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager_metrics.cc        |  8 +++++-
 src/kudu/fs/block_manager_metrics.h         |  1 +
 src/kudu/fs/file_block_manager.cc           |  2 ++
 src/kudu/fs/log_block_manager.cc            |  3 +++
 src/kudu/tserver/tablet_copy_client-test.cc | 32 ++++++++++++++++++++++--
 src/kudu/tserver/tablet_copy_client.cc      | 25 ++++++++++--------
 src/kudu/tserver/tablet_copy_client.h       | 17 ++++++++++---
 7 files changed, 71 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/3d42a264/src/kudu/fs/block_manager_metrics.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager_metrics.cc b/src/kudu/fs/block_manager_metrics.cc
index c53f26f..26bb50e 100644
--- a/src/kudu/fs/block_manager_metrics.cc
+++ b/src/kudu/fs/block_manager_metrics.cc
@@ -49,6 +49,11 @@ METRIC_DEFINE_counter(server, block_manager_total_bytes_read,
                       kudu::MetricUnit::kBytes,
                       "Number of bytes of block data read since service start");
 
+METRIC_DEFINE_counter(server, block_manager_total_disk_sync,
+                      "Block Data Disk Synchronization Count",
+                      kudu::MetricUnit::kBlocks,
+                      "Number of disk synchronizations of block data since service start");
+
 namespace kudu {
 namespace fs {
 namespace internal {
@@ -61,7 +66,8 @@ BlockManagerMetrics::BlockManagerMetrics(const scoped_refptr<MetricEntity>&
enti
     MINIT(total_readable_blocks),
     MINIT(total_writable_blocks),
     MINIT(total_bytes_read),
-    MINIT(total_bytes_written) {
+    MINIT(total_bytes_written),
+    MINIT(total_disk_sync) {
 }
 #undef GINIT
 #undef MINIT

http://git-wip-us.apache.org/repos/asf/kudu/blob/3d42a264/src/kudu/fs/block_manager_metrics.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager_metrics.h b/src/kudu/fs/block_manager_metrics.h
index 760f98b..03d9872 100644
--- a/src/kudu/fs/block_manager_metrics.h
+++ b/src/kudu/fs/block_manager_metrics.h
@@ -36,6 +36,7 @@ struct BlockManagerMetrics {
   scoped_refptr<Counter> total_writable_blocks;
   scoped_refptr<Counter> total_bytes_read;
   scoped_refptr<Counter> total_bytes_written;
+  scoped_refptr<Counter> total_disk_sync;
 };
 
 } // namespace internal

http://git-wip-us.apache.org/repos/asf/kudu/blob/3d42a264/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index bd5d35d..dda2a31 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -390,6 +390,7 @@ Status FileWritableBlock::Close(SyncMode mode) {
     // Safer to synchronize data first, then metadata.
     VLOG(3) << "Syncing block " << id();
     if (FLAGS_enable_data_block_fsync) {
+      if (block_manager_->metrics_) block_manager_->metrics_->total_disk_sync->Increment();
       sync = writer_->Sync();
     }
     if (sync.ok()) {
@@ -559,6 +560,7 @@ Status FileBlockManager::SyncMetadata(const internal::FileBlockLocation&
locatio
   // Sync them.
   if (FLAGS_enable_data_block_fsync) {
     for (const string& s : to_sync) {
+      if (metrics_) metrics_->total_disk_sync->Increment();
       RETURN_NOT_OK_HANDLE_DISK_FAILURE(env_->SyncDir(s),
           error_manager_->RunErrorNotificationCb(location.data_dir()));
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/3d42a264/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index cf80d4e..2b06afa 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -991,6 +991,7 @@ Status LogBlockContainer::FlushMetadata() {
 Status LogBlockContainer::SyncData() {
   DCHECK(!read_only());
   if (FLAGS_enable_data_block_fsync) {
+    if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment();
     RETURN_NOT_OK_HANDLE_ERROR(data_file_->Sync());
   }
   return Status::OK();
@@ -999,6 +1000,7 @@ Status LogBlockContainer::SyncData() {
 Status LogBlockContainer::SyncMetadata() {
   DCHECK(!read_only());
   if (FLAGS_enable_data_block_fsync) {
+    if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment();
     RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Sync());
   }
   return Status::OK();
@@ -1886,6 +1888,7 @@ Status LogBlockManager::SyncContainer(const LogBlockContainer& container)
{
   }
 
   if (to_sync && FLAGS_enable_data_block_fsync) {
+    if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment();
     s = env_->SyncDir(container.data_dir()->dir());
 
     // If SyncDir fails, the container directory must be restored to

http://git-wip-us.apache.org/repos/asf/kudu/blob/3d42a264/src/kudu/tserver/tablet_copy_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc
index c6afa49..e299257 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -23,6 +23,7 @@
 #include <string>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
@@ -38,6 +39,7 @@
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/casts.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
@@ -53,6 +55,7 @@
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/slice.h"
@@ -63,6 +66,12 @@ using std::shared_ptr;
 using std::string;
 using std::vector;
 
+DECLARE_string(block_manager);
+
+METRIC_DECLARE_counter(block_manager_total_disk_sync);
+METRIC_DECLARE_counter(block_manager_total_writable_blocks);
+METRIC_DECLARE_gauge_uint64(log_block_manager_containers);
+
 namespace kudu {
 namespace tserver {
 
@@ -78,7 +87,13 @@ class TabletCopyClientTest : public TabletCopyTest {
   virtual void SetUp() OVERRIDE {
     NO_FATALS(TabletCopyTest::SetUp());
 
-    fs_manager_.reset(new FsManager(Env::Default(), GetTestPath("client_tablet")));
+    const string kTestDir = GetTestPath("client_tablet");
+    FsManagerOpts opts;
+    opts.wal_path = kTestDir;
+    opts.data_paths = { kTestDir };
+    metric_entity_ = METRIC_ENTITY_server.Instantiate(&metric_registry_, "test");
+    opts.metric_entity = metric_entity_;
+    fs_manager_.reset(new FsManager(Env::Default(), opts));
     ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
     ASSERT_OK(fs_manager_->Open());
 
@@ -101,6 +116,8 @@ class TabletCopyClientTest : public TabletCopyTest {
  protected:
   Status CompareFileContents(const string& path1, const string& path2);
 
+  MetricRegistry metric_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
   gscoped_ptr<FsManager> fs_manager_;
   shared_ptr<rpc::Messenger> messenger_;
   gscoped_ptr<TabletCopyClient> client_;
@@ -160,7 +177,9 @@ TEST_F(TabletCopyClientTest, TestDownloadBlock) {
 
   // Check that the client downloaded the block and verification passed.
   BlockId new_block_id;
-  ASSERT_OK(client_->DownloadBlock(block_id, &new_block_id));
+  fs::BlockTransaction transaction;
+  ASSERT_OK(client_->DownloadBlock(block_id, &new_block_id, &transaction));
+  ASSERT_OK(transaction.CommitCreatedBlocks());
 
   // Ensure it placed the block where we expected it to.
   ASSERT_OK(ReadLocalBlockFile(fs_manager_.get(), new_block_id, &scratch, &slice));
@@ -226,6 +245,15 @@ TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) {
   // Download all the blocks.
   ASSERT_OK(client_->DownloadBlocks());
 
+  // Verify the disk synchronization count.
+  if (FLAGS_block_manager == "log") {
+    ASSERT_EQ(3, down_cast<Counter*>(
+        metric_entity_->FindOrNull(METRIC_block_manager_total_disk_sync).get())->value());
+  } else {
+    ASSERT_EQ(14, down_cast<Counter*>(
+        metric_entity_->FindOrNull(METRIC_block_manager_total_disk_sync).get())->value());
+  }
+
   // After downloading blocks, verify that the old and remote and local
   // superblock point to the same number of blocks.
   vector<BlockId> old_data_blocks = ListBlocks(*client_->remote_superblock_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/3d42a264/src/kudu/tserver/tablet_copy_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index 07b20ff..2b894fe 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -91,6 +91,7 @@ using consensus::ConsensusMetadata;
 using consensus::ConsensusMetadataManager;
 using consensus::OpId;
 using env_util::CopyFile;
+using fs::BlockTransaction;
 using fs::CreateBlockOptions;
 using fs::WritableBlock;
 using rpc::Messenger;
@@ -458,6 +459,7 @@ Status TabletCopyClient::DownloadBlocks() {
   // as each block downloads.
   int block_count = 0;
   LOG_WITH_PREFIX(INFO) << "Starting download of " << num_remote_blocks <<
" data blocks...";
+  BlockTransaction transaction;
   for (const RowSetDataPB& src_rowset : remote_superblock_->rowsets()) {
     // Create rowset.
     RowSetDataPB* dst_rowset = superblock_->add_rowsets();
@@ -478,7 +480,7 @@ Status TabletCopyClient::DownloadBlocks() {
     for (const ColumnDataPB& src_col : src_rowset.columns()) {
       BlockIdPB new_block_id;
       RETURN_NOT_OK(DownloadAndRewriteBlock(src_col.block(), num_remote_blocks,
-                                            &block_count, &new_block_id));
+                                            &block_count, &new_block_id, &transaction));
       ColumnDataPB* dst_col = dst_rowset->add_columns();
       *dst_col = src_col;
       *dst_col->mutable_block() = new_block_id;
@@ -486,7 +488,7 @@ Status TabletCopyClient::DownloadBlocks() {
     for (const DeltaDataPB& src_redo : src_rowset.redo_deltas()) {
       BlockIdPB new_block_id;
       RETURN_NOT_OK(DownloadAndRewriteBlock(src_redo.block(), num_remote_blocks,
-                                            &block_count, &new_block_id));
+                                            &block_count, &new_block_id, &transaction));
       DeltaDataPB* dst_redo = dst_rowset->add_redo_deltas();
       *dst_redo = src_redo;
       *dst_redo->mutable_block() = new_block_id;
@@ -494,7 +496,7 @@ Status TabletCopyClient::DownloadBlocks() {
     for (const DeltaDataPB& src_undo : src_rowset.undo_deltas()) {
       BlockIdPB new_block_id;
       RETURN_NOT_OK(DownloadAndRewriteBlock(src_undo.block(), num_remote_blocks,
-                                            &block_count, &new_block_id));
+                                            &block_count, &new_block_id, &transaction));
       DeltaDataPB* dst_undo = dst_rowset->add_undo_deltas();
       *dst_undo = src_undo;
       *dst_undo->mutable_block() = new_block_id;
@@ -502,18 +504,18 @@ Status TabletCopyClient::DownloadBlocks() {
     if (src_rowset.has_bloom_block()) {
       BlockIdPB new_block_id;
       RETURN_NOT_OK(DownloadAndRewriteBlock(src_rowset.bloom_block(), num_remote_blocks,
-                                            &block_count, &new_block_id));
+                                            &block_count, &new_block_id, &transaction));
       *dst_rowset->mutable_bloom_block() = new_block_id;
     }
     if (src_rowset.has_adhoc_index_block()) {
       BlockIdPB new_block_id;
       RETURN_NOT_OK(DownloadAndRewriteBlock(src_rowset.adhoc_index_block(), num_remote_blocks,
-                                            &block_count, &new_block_id));
+                                            &block_count, &new_block_id, &transaction));
       *dst_rowset->mutable_adhoc_index_block() = new_block_id;
     }
   }
 
-  return Status::OK();
+  return transaction.CommitCreatedBlocks();
 }
 
 Status TabletCopyClient::DownloadWAL(uint64_t wal_segment_seqno) {
@@ -563,13 +565,14 @@ Status TabletCopyClient::WriteConsensusMetadata() {
 Status TabletCopyClient::DownloadAndRewriteBlock(const BlockIdPB& src_block_id,
                                                  int num_blocks,
                                                  int* block_count,
-                                                 BlockIdPB* dest_block_id) {
+                                                 BlockIdPB* dest_block_id,
+                                                 BlockTransaction* transaction) {
   BlockId old_block_id(BlockId::FromPB(src_block_id));
   SetStatusMessage(Substitute("Downloading block $0 ($1/$2)",
                               old_block_id.ToString(),
                               *block_count + 1, num_blocks));
   BlockId new_block_id;
-  RETURN_NOT_OK_PREPEND(DownloadBlock(old_block_id, &new_block_id),
+  RETURN_NOT_OK_PREPEND(DownloadBlock(old_block_id, &new_block_id, transaction),
       "Unable to download block with id " + old_block_id.ToString());
 
   new_block_id.CopyToPB(dest_block_id);
@@ -578,7 +581,8 @@ Status TabletCopyClient::DownloadAndRewriteBlock(const BlockIdPB&
src_block_id,
 }
 
 Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id,
-                                       BlockId* new_block_id) {
+                                       BlockId* new_block_id,
+                                       BlockTransaction* transaction) {
   VLOG_WITH_PREFIX(1) << "Downloading block with block_id " << old_block_id.ToString();
 
   unique_ptr<WritableBlock> block;
@@ -593,7 +597,8 @@ Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id,
                                    old_block_id.ToString()));
 
   *new_block_id = block->id();
-  RETURN_NOT_OK_PREPEND(block->Close(), "Unable to close block");
+  RETURN_NOT_OK_PREPEND(block->Finalize(), "Unable to finalize block");
+  transaction->AddCreatedBlock(std::move(block));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/3d42a264/src/kudu/tserver/tablet_copy_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h
index 429c05b..f8a47bc 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -41,6 +41,10 @@ class ConsensusMetadataManager;
 class ConsensusStatePB;
 } // namespace consensus
 
+namespace fs {
+class BlockTransaction;
+} // namespace fs
+
 namespace rpc {
 class Messenger;
 class RpcController;
@@ -158,7 +162,8 @@ class TabletCopyClient {
 
   // Download the remote block specified by 'src_block_id'. 'num_blocks' should
   // be given as the total number of blocks there are to download (for logging
-  // purposes).
+  // purposes). Add the block to the given transaction, to close blocks belonging
+  // to a transaction together when the copying is complete.
   //
   // On success:
   // - 'dest_block_id' is set to the new ID of the downloaded block.
@@ -166,13 +171,17 @@ class TabletCopyClient {
   Status DownloadAndRewriteBlock(const BlockIdPB& src_block_id,
                                  int num_blocks,
                                  int* block_count,
-                                 BlockIdPB* dest_block_id);
+                                 BlockIdPB* dest_block_id,
+                                 fs::BlockTransaction* transaction);
 
   // Download a single block.
-  // Data block is opened with options so that it will fsync() on close.
+  // Data block is opened with new ID. After downloading, the block is finalized
+  // and added to the given transaction.
   //
   // On success, 'new_block_id' is set to the new ID of the downloaded block.
-  Status DownloadBlock(const BlockId& old_block_id, BlockId* new_block_id);
+  Status DownloadBlock(const BlockId& old_block_id,
+                       BlockId* new_block_id,
+                       fs::BlockTransaction* transaction);
 
   // Download a single remote file. The block and WAL implementations delegate
   // to this method when downloading files.


Mime
View raw message