kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [2/3] kudu git commit: KUDU-2469 pt 1: add an IOContext
Date Wed, 29 Aug 2018 01:51:22 GMT
http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/deltamemstore.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index b637e00..253a2c9 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -57,6 +57,10 @@ namespace consensus {
 class OpId;
 }
 
+namespace fs {
+struct IOContext;
+}
+
 namespace tablet {
 
 class DeltaFileWriter;
@@ -78,7 +82,7 @@ class DeltaMemStore : public DeltaStore,
                        std::shared_ptr<MemTracker> parent_tracker,
                        std::shared_ptr<DeltaMemStore>* dms);
 
-  virtual Status Init() OVERRIDE;
+  virtual Status Init(const fs::IOContext* io_context) OVERRIDE;
 
   virtual bool Initted() OVERRIDE {
     return true;
@@ -122,7 +126,8 @@ class DeltaMemStore : public DeltaStore,
   virtual Status NewDeltaIterator(const RowIteratorOptions& opts,
                                   DeltaIterator** iterator) const OVERRIDE;
 
-  virtual Status CheckRowDeleted(rowid_t row_idx, bool *deleted) const OVERRIDE;
+  virtual Status CheckRowDeleted(rowid_t row_idx, const fs::IOContext* io_context,
+                                 bool* deleted) const OVERRIDE;
 
   virtual uint64_t EstimateSize() const OVERRIDE {
     return arena_->memory_footprint();

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/diskrowset-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset-test-base.h b/src/kudu/tablet/diskrowset-test-base.h
index 9742169..ec3f4b3 100644
--- a/src/kudu/tablet/diskrowset-test-base.h
+++ b/src/kudu/tablet/diskrowset-test-base.h
@@ -186,7 +186,7 @@ class TestRowSet : public KuduRowSetTest {
     ProbeStats stats;
     ScopedTransaction tx(&mvcc_, clock_->Now());
     tx.StartApplying();
-    Status s = rs->MutateRow(tx.timestamp(), probe, mutation, op_id_, &stats, result);
+    Status s = rs->MutateRow(tx.timestamp(), probe, mutation, op_id_, nullptr, &stats, result);
     tx.Commit();
     return s;
   }
@@ -196,7 +196,7 @@ class TestRowSet : public KuduRowSetTest {
     BuildRowKey(&rb, row_idx);
     RowSetKeyProbe probe(rb.row());
     ProbeStats stats;
-    return rs.CheckRowPresent(probe, present, &stats);
+    return rs.CheckRowPresent(probe, nullptr, present, &stats);
   }
 
   // Verify the contents of the given rowset.
@@ -327,6 +327,7 @@ class TestRowSet : public KuduRowSetTest {
     return DiskRowSet::Open(rowset_meta_,
                             new log::LogAnchorRegistry(),
                             TabletMemTrackers(),
+                            nullptr,
                             rowset);
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/diskrowset-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset-test.cc b/src/kudu/tablet/diskrowset-test.cc
index ce0fb7d..7e89122 100644
--- a/src/kudu/tablet/diskrowset-test.cc
+++ b/src/kudu/tablet/diskrowset-test.cc
@@ -121,7 +121,7 @@ TEST_F(TestRowSet, TestRowSetRoundTrip) {
     rb.AddString(Slice("h"));
     RowSetKeyProbe probe(rb.row());
     bool present;
-    ASSERT_OK(rs->CheckRowPresent(probe, &present, &stats));
+    ASSERT_OK(rs->CheckRowPresent(probe, nullptr, &present, &stats));
     ASSERT_FALSE(present);
   }
 
@@ -131,7 +131,7 @@ TEST_F(TestRowSet, TestRowSetRoundTrip) {
     rb.AddString(Slice("z"));
     RowSetKeyProbe probe(rb.row());
     bool present;
-    ASSERT_OK(rs->CheckRowPresent(probe, &present, &stats));
+    ASSERT_OK(rs->CheckRowPresent(probe, nullptr, &present, &stats));
     ASSERT_FALSE(present);
   }
 
@@ -142,7 +142,7 @@ TEST_F(TestRowSet, TestRowSetRoundTrip) {
     rb.AddString(Slice("hello 00000000000049x"));
     RowSetKeyProbe probe(rb.row());
     bool present;
-    ASSERT_OK(rs->CheckRowPresent(probe, &present, &stats));
+    ASSERT_OK(rs->CheckRowPresent(probe, nullptr, &present, &stats));
     ASSERT_FALSE(present);
   }
 
@@ -154,7 +154,7 @@ TEST_F(TestRowSet, TestRowSetRoundTrip) {
     rb.AddString(Slice(buf));
     RowSetKeyProbe probe(rb.row());
     bool present;
-    ASSERT_OK(rs->CheckRowPresent(probe, &present, &stats));
+    ASSERT_OK(rs->CheckRowPresent(probe, nullptr, &present, &stats));
     ASSERT_TRUE(present);
   }
 }
@@ -188,7 +188,7 @@ TEST_F(TestRowSet, TestRowSetUpdate) {
 
   OperationResultPB result;
   ProbeStats stats;
-  Status s = rs->MutateRow(timestamp, probe, enc.as_changelist(), op_id_, &stats, &result);
+  Status s = rs->MutateRow(timestamp, probe, enc.as_changelist(), op_id_, nullptr, &stats, &result);
   ASSERT_TRUE(s.IsNotFound());
   ASSERT_EQ(0, result.mutated_stores_size());
 
@@ -219,7 +219,7 @@ TEST_F(TestRowSet, TestErrorDuringUpdate) {
   // The mutation should result in an IOError.
   OperationResultPB result;
   ProbeStats stats;
-  Status s = rs->MutateRow(timestamp, probe, enc.as_changelist(), op_id_, &stats, &result);
+  Status s = rs->MutateRow(timestamp, probe, enc.as_changelist(), op_id_, nullptr, &stats, &result);
   LOG(INFO) << s.ToString();
   ASSERT_TRUE(s.IsIOError());
 }
@@ -307,7 +307,7 @@ TEST_F(TestRowSet, TestDelete) {
       // Flush DMS. The second pass through the loop will re-verify that the
       // externally visible state of the layer has not changed.
       // deletions now in a DeltaFile.
-      ASSERT_OK(rs->FlushDeltas());
+      ASSERT_OK(rs->FlushDeltas(nullptr));
     }
   }
 }
@@ -330,7 +330,7 @@ TEST_F(TestRowSet, TestDMSFlush) {
     ASSERT_EQ(static_cast<int>(n_rows_ * FLAGS_update_fraction),
               rs->delta_tracker_->dms_->Count());
 
-    ASSERT_OK(rs->FlushDeltas());
+    ASSERT_OK(rs->FlushDeltas(nullptr));
 
     // Check that the DiskRowSet's DMS has now been emptied.
     ASSERT_EQ(0, rs->delta_tracker_->dms_->Count());
@@ -403,6 +403,7 @@ TEST_F(TestRowSet, TestFlushedUpdatesRespectMVCC) {
                                           probe,
                                           RowChangeList(update_buf),
                                           op_id_,
+                                          nullptr,
                                           &stats,
                                           &result));
       ASSERT_EQ(1, result.mutated_stores_size());
@@ -429,7 +430,7 @@ TEST_F(TestRowSet, TestFlushedUpdatesRespectMVCC) {
 
   // Flush deltas to disk and ensure that the historical versions are still
   // accessible.
-  ASSERT_OK(rs->FlushDeltas());
+  ASSERT_OK(rs->FlushDeltas(nullptr));
 
   for (int i = 0; i < 5; i++) {
     SCOPED_TRACE(i);
@@ -469,7 +470,7 @@ TEST_F(TestRowSet, TestDeltaApplicationPerformance) {
       StringPrintf("Reading %zd rows with %.2f%% updates %d times (updates in DMS)",
                    n_rows_, FLAGS_update_fraction * 100.0f,
                    FLAGS_n_read_passes));
-    ASSERT_OK(rs->FlushDeltas());
+    ASSERT_OK(rs->FlushDeltas(nullptr));
 
     BenchmarkIterationPerformance(*rs.get(),
       StringPrintf("Reading %zd rows with %.2f%% updates %d times (updates on disk)",
@@ -505,15 +506,15 @@ TEST_F(TestRowSet, TestMakeDeltaIteratorMergerUnlocked) {
   shared_ptr<DiskRowSet> rs;
   ASSERT_OK(OpenTestRowSet(&rs));
   UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
-  ASSERT_OK(rs->FlushDeltas());
+  ASSERT_OK(rs->FlushDeltas(nullptr));
   DeltaTracker *dt = rs->delta_tracker();
   int num_stores = dt->redo_delta_stores_.size();
   vector<shared_ptr<DeltaStore> > compacted_stores;
   vector<BlockId> compacted_blocks;
   unique_ptr<DeltaIterator> merge_iter;
-  ASSERT_OK(dt->MakeDeltaIteratorMergerUnlocked(0, num_stores - 1, &schema_,
-                                                       &compacted_stores,
-                                                       &compacted_blocks, &merge_iter));
+  ASSERT_OK(dt->MakeDeltaIteratorMergerUnlocked(nullptr, 0, num_stores - 1, &schema_,
+                                                &compacted_stores,
+                                                &compacted_blocks, &merge_iter));
   vector<string> results;
   ASSERT_OK(DebugDumpDeltaIterator(REDO, merge_iter.get(), schema_,
                                           ITERATE_OVER_ALL_ROWS,
@@ -550,7 +551,7 @@ TEST_F(TestRowSet, TestCompactStores) {
 
   // Write a first delta file.
   UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
-  ASSERT_OK(rs->FlushDeltas());
+  ASSERT_OK(rs->FlushDeltas(nullptr));
   // One file isn't enough for minor compactions, but a major compaction can run.
   ASSERT_EQ(0, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
   NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
@@ -558,7 +559,7 @@ TEST_F(TestRowSet, TestCompactStores) {
 
   // Write a second delta file.
   UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
-  ASSERT_OK(rs->FlushDeltas());
+  ASSERT_OK(rs->FlushDeltas(nullptr));
   // Two files is enough for all delta compactions.
   NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
       RowSet::MINOR_DELTA_COMPACTION)));
@@ -567,7 +568,7 @@ TEST_F(TestRowSet, TestCompactStores) {
 
   // Write a third delta file.
   UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
-  ASSERT_OK(rs->FlushDeltas());
+  ASSERT_OK(rs->FlushDeltas(nullptr));
   // We're hitting the max for minor compactions but not for major compactions.
   ASSERT_EQ(1, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
   NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
@@ -578,7 +579,7 @@ TEST_F(TestRowSet, TestCompactStores) {
   int num_stores = dt->redo_delta_stores_.size();
   VLOG(1) << "Number of stores before compaction: " << num_stores;
   ASSERT_EQ(num_stores, 3);
-  ASSERT_OK(dt->CompactStores(0, num_stores - 1));
+  ASSERT_OK(dt->CompactStores(nullptr, 0, num_stores - 1));
   num_stores = dt->redo_delta_stores_.size();
   VLOG(1) << "Number of stores after compaction: " << num_stores;
   ASSERT_EQ(1,  num_stores);
@@ -591,7 +592,7 @@ TEST_F(TestRowSet, TestCompactStores) {
   vector<shared_ptr<DeltaStore> > compacted_stores;
   vector<BlockId> compacted_blocks;
   unique_ptr<DeltaIterator> merge_iter;
-  ASSERT_OK(dt->MakeDeltaIteratorMergerUnlocked(0, num_stores - 1, &schema_,
+  ASSERT_OK(dt->MakeDeltaIteratorMergerUnlocked(nullptr, 0, num_stores - 1, &schema_,
                                                 &compacted_stores,
                                                 &compacted_blocks, &merge_iter));
   vector<string> results;
@@ -619,19 +620,19 @@ TEST_F(TestRowSet, TestGCAncientStores) {
 
   // Write and flush a new REDO delta file.
   UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
-  ASSERT_OK(rs->FlushDeltas());
+  ASSERT_OK(rs->FlushDeltas(nullptr));
   ASSERT_EQ(0, dt->CountUndoDeltaStores());
   ASSERT_EQ(1, dt->CountRedoDeltaStores());
 
   // Convert the REDO delta to an UNDO delta.
-  ASSERT_OK(rs->MajorCompactDeltaStores(HistoryGcOpts::Disabled()));
+  ASSERT_OK(rs->MajorCompactDeltaStores(nullptr, HistoryGcOpts::Disabled()));
   ASSERT_EQ(1, dt->CountUndoDeltaStores()); // From doing the major delta compaction.
   ASSERT_EQ(0, dt->CountRedoDeltaStores());
 
   // Delete all the UNDO deltas. There shouldn't be any delta stores left.
   int64_t blocks_deleted;
   int64_t bytes_deleted;
-  ASSERT_OK(dt->DeleteAncientUndoDeltas(clock_->Now(), &blocks_deleted, &bytes_deleted));
+  ASSERT_OK(dt->DeleteAncientUndoDeltas(clock_->Now(), nullptr, &blocks_deleted, &bytes_deleted));
   ASSERT_GT(blocks_deleted, 0);
   ASSERT_GT(bytes_deleted, 0);
   ASSERT_EQ(0, dt->CountUndoDeltaStores());
@@ -649,7 +650,7 @@ TEST_F(TestRowSet, TestDiskSizeEstimation) {
 
   // Write a first delta file.
   UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
-  ASSERT_OK(rs->FlushDeltas());
+  ASSERT_OK(rs->FlushDeltas(nullptr));
 
   // The rowset consists of the cfile set and REDO deltas, so the rowset's
   // on-disk size and the sum of the cfile set and REDO sizes should equal.
@@ -659,7 +660,7 @@ TEST_F(TestRowSet, TestDiskSizeEstimation) {
   ASSERT_EQ(rs->OnDiskSize(), drss.CFileSetOnDiskSize() + drss.redo_deltas_size);
 
   // Convert the REDO delta to an UNDO delta.
-  ASSERT_OK(rs->MajorCompactDeltaStores(HistoryGcOpts::Disabled()));
+  ASSERT_OK(rs->MajorCompactDeltaStores(nullptr, HistoryGcOpts::Disabled()));
 
   // REDO size should be zero, but there should be UNDOs, so the on-disk size
   // of the rowset should be the sum of the cfile set and UNDO sizes.
@@ -669,7 +670,7 @@ TEST_F(TestRowSet, TestDiskSizeEstimation) {
 
   // Write a second delta file.
   UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
-  ASSERT_OK(rs->FlushDeltas());
+  ASSERT_OK(rs->FlushDeltas(nullptr));
 
   // Now there's base data, REDOs, and UNDOs.
   rs->GetDiskRowSetSpaceUsage(&drss);

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/diskrowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 6517fdb..25c366b 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -94,6 +94,7 @@ using cfile::BloomFileWriter;
 using fs::BlockManager;
 using fs::BlockCreationTransaction;
 using fs::CreateBlockOptions;
+using fs::IOContext;
 using fs::WritableBlock;
 using log::LogAnchorRegistry;
 using std::shared_ptr;
@@ -508,12 +509,13 @@ RollingDiskRowSetWriter::~RollingDiskRowSetWriter() {
 Status DiskRowSet::Open(const shared_ptr<RowSetMetadata>& rowset_metadata,
                         log::LogAnchorRegistry* log_anchor_registry,
                         const TabletMemTrackers& mem_trackers,
+                        const IOContext* io_context,
                         shared_ptr<DiskRowSet> *rowset) {
   shared_ptr<DiskRowSet> rs(new DiskRowSet(rowset_metadata,
                                            log_anchor_registry,
                                            mem_trackers));
 
-  RETURN_NOT_OK(rs->Open());
+  RETURN_NOT_OK(rs->Open(io_context));
 
   rowset->swap(rs);
   return Status::OK();
@@ -529,15 +531,17 @@ DiskRowSet::DiskRowSet(shared_ptr<RowSetMetadata> rowset_metadata,
       num_rows_(-1),
       has_been_compacted_(false) {}
 
-Status DiskRowSet::Open() {
+Status DiskRowSet::Open(const IOContext* io_context) {
   TRACE_EVENT0("tablet", "DiskRowSet::Open");
   RETURN_NOT_OK(CFileSet::Open(rowset_metadata_,
                                mem_trackers_.tablet_tracker,
+                               io_context,
                                &base_data_));
 
   RETURN_NOT_OK(DeltaTracker::Open(rowset_metadata_,
                                    log_anchor_registry_,
                                    mem_trackers_,
+                                   io_context,
                                    &delta_tracker_));
 
   open_ = true;
@@ -545,17 +549,18 @@ Status DiskRowSet::Open() {
   return Status::OK();
 }
 
-Status DiskRowSet::FlushDeltas() {
+Status DiskRowSet::FlushDeltas(const IOContext* io_context) {
   TRACE_EVENT0("tablet", "DiskRowSet::FlushDeltas");
-  return delta_tracker_->Flush(DeltaTracker::FLUSH_METADATA);
+  return delta_tracker_->Flush(io_context, DeltaTracker::FLUSH_METADATA);
 }
 
-Status DiskRowSet::MinorCompactDeltaStores() {
+Status DiskRowSet::MinorCompactDeltaStores(const IOContext* io_context) {
   TRACE_EVENT0("tablet", "DiskRowSet::MinorCompactDeltaStores");
-  return delta_tracker_->Compact();
+  return delta_tracker_->Compact(io_context);
 }
 
-Status DiskRowSet::MajorCompactDeltaStores(HistoryGcOpts history_gc_opts) {
+Status DiskRowSet::MajorCompactDeltaStores(const IOContext* io_context,
+                                           HistoryGcOpts history_gc_opts) {
   vector<ColumnId> col_ids;
   delta_tracker_->GetColumnIdsWithUpdates(&col_ids);
 
@@ -564,10 +569,11 @@ Status DiskRowSet::MajorCompactDeltaStores(HistoryGcOpts history_gc_opts) {
     return Status::OK();
   }
 
-  return MajorCompactDeltaStoresWithColumnIds(col_ids, std::move(history_gc_opts));
+  return MajorCompactDeltaStoresWithColumnIds(col_ids, io_context, std::move(history_gc_opts));
 }
 
 Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>& col_ids,
+                                                        const IOContext* io_context,
                                                         HistoryGcOpts history_gc_opts) {
   LOG_WITH_PREFIX(INFO) << "Major compacting REDO delta stores (cols: " << col_ids << ")";
   TRACE_EVENT0("tablet", "DiskRowSet::MajorCompactDeltaStoresWithColumnIds");
@@ -576,9 +582,10 @@ Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
 
   // TODO(todd): do we need to lock schema or anything here?
   gscoped_ptr<MajorDeltaCompaction> compaction;
-  RETURN_NOT_OK(NewMajorDeltaCompaction(col_ids, std::move(history_gc_opts), &compaction));
+  RETURN_NOT_OK(NewMajorDeltaCompaction(col_ids, std::move(history_gc_opts),
+                                        io_context, &compaction));
 
-  RETURN_NOT_OK(compaction->Compact());
+  RETURN_NOT_OK(compaction->Compact(io_context));
 
   // Before updating anything, create a copy of the rowset metadata so we can
   // revert changes in case of error.
@@ -599,12 +606,13 @@ Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
   // appropriate blocks to match the update.
   shared_ptr<CFileSet> new_base;
   RETURN_NOT_OK(CFileSet::Open(rowset_metadata_,
-                          mem_trackers_.tablet_tracker,
-                          &new_base));
+                               mem_trackers_.tablet_tracker,
+                               io_context,
+                               &new_base));
   {
     // Update the delta tracker and the base data with the changes.
     std::lock_guard<rw_spinlock> lock(component_lock_);
-    RETURN_NOT_OK(compaction->UpdateDeltaTracker(delta_tracker_.get()));
+    RETURN_NOT_OK(compaction->UpdateDeltaTracker(delta_tracker_.get(), io_context));
     base_data_.swap(new_base);
   }
 
@@ -621,6 +629,7 @@ Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
 
 Status DiskRowSet::NewMajorDeltaCompaction(const vector<ColumnId>& col_ids,
                                            HistoryGcOpts history_gc_opts,
+                                           const IOContext* io_context,
                                            gscoped_ptr<MajorDeltaCompaction>* out) const {
   DCHECK(open_);
   shared_lock<rw_spinlock> l(component_lock_);
@@ -629,6 +638,7 @@ Status DiskRowSet::NewMajorDeltaCompaction(const vector<ColumnId>& col_ids,
 
   RowIteratorOptions opts;
   opts.projection = schema;
+  opts.io_context = io_context;
   vector<shared_ptr<DeltaStore>> included_stores;
   unique_ptr<DeltaIterator> delta_iter;
   RETURN_NOT_OK(delta_tracker_->NewDeltaFileIterator(
@@ -650,7 +660,8 @@ Status DiskRowSet::NewRowIterator(const RowIteratorOptions& opts,
   DCHECK(open_);
   shared_lock<rw_spinlock> l(component_lock_);
 
-  shared_ptr<CFileSet::Iterator> base_iter(base_data_->NewIterator(opts.projection));
+  shared_ptr<CFileSet::Iterator> base_iter(base_data_->NewIterator(opts.projection,
+                                                                   opts.io_context));
   gscoped_ptr<ColumnwiseIterator> col_iter;
   RETURN_NOT_OK(delta_tracker_->WrapIterator(base_iter, opts, &col_iter));
 
@@ -661,25 +672,27 @@ Status DiskRowSet::NewRowIterator(const RowIteratorOptions& opts,
 
 Status DiskRowSet::NewCompactionInput(const Schema* projection,
                                       const MvccSnapshot &snap,
-                                      gscoped_ptr<CompactionInput>* out) const  {
-  return CompactionInput::Create(*this, projection, snap, out);
+                                      const IOContext* io_context,
+                                      gscoped_ptr<CompactionInput>* out) const {
+  return CompactionInput::Create(*this, projection, snap, io_context, out);
 }
 
 Status DiskRowSet::MutateRow(Timestamp timestamp,
                              const RowSetKeyProbe &probe,
                              const RowChangeList &update,
                              const consensus::OpId& op_id,
+                             const IOContext* io_context,
                              ProbeStats* stats,
                              OperationResultPB* result) {
   DCHECK(open_);
 #ifndef NDEBUG
   rowid_t num_rows;
-  RETURN_NOT_OK(CountRows(&num_rows));
+  RETURN_NOT_OK(CountRows(io_context, &num_rows));
 #endif
   shared_lock<rw_spinlock> l(component_lock_);
 
   boost::optional<rowid_t> row_idx;
-  RETURN_NOT_OK(base_data_->FindRow(probe, &row_idx, stats));
+  RETURN_NOT_OK(base_data_->FindRow(probe, io_context, &row_idx, stats));
   if (PREDICT_FALSE(row_idx == boost::none)) {
     return Status::NotFound("row not found");
   }
@@ -690,7 +703,7 @@ Status DiskRowSet::MutateRow(Timestamp timestamp,
   // It's possible that the row key exists in this DiskRowSet, but it has
   // in fact been Deleted already. Check with the delta tracker to be sure.
   bool deleted;
-  RETURN_NOT_OK(delta_tracker_->CheckRowDeleted(*row_idx, &deleted, stats));
+  RETURN_NOT_OK(delta_tracker_->CheckRowDeleted(*row_idx, io_context, &deleted, stats));
   if (deleted) {
     return Status::NotFound("row not found");
   }
@@ -701,17 +714,18 @@ Status DiskRowSet::MutateRow(Timestamp timestamp,
 }
 
 Status DiskRowSet::CheckRowPresent(const RowSetKeyProbe &probe,
+                                   const IOContext* io_context,
                                    bool* present,
                                    ProbeStats* stats) const {
   DCHECK(open_);
 #ifndef NDEBUG
   rowid_t num_rows;
-  RETURN_NOT_OK(CountRows(&num_rows));
+  RETURN_NOT_OK(CountRows(io_context, &num_rows));
 #endif
   shared_lock<rw_spinlock> l(component_lock_);
 
   rowid_t row_idx;
-  RETURN_NOT_OK(base_data_->CheckRowPresent(probe, present, &row_idx, stats));
+  RETURN_NOT_OK(base_data_->CheckRowPresent(probe, io_context, present, &row_idx, stats));
   if (!*present) {
     // If it wasn't in the base data, then it's definitely not in the rowset.
     return Status::OK();
@@ -722,19 +736,19 @@ Status DiskRowSet::CheckRowPresent(const RowSetKeyProbe &probe,
 
   // Otherwise it might be in the base data but deleted.
   bool deleted = false;
-  RETURN_NOT_OK(delta_tracker_->CheckRowDeleted(row_idx, &deleted, stats));
+  RETURN_NOT_OK(delta_tracker_->CheckRowDeleted(row_idx, io_context, &deleted, stats));
   *present = !deleted;
   return Status::OK();
 }
 
-Status DiskRowSet::CountRows(rowid_t *count) const {
+Status DiskRowSet::CountRows(const IOContext* io_context, rowid_t *count) const {
   DCHECK(open_);
   rowid_t num_rows = num_rows_.load();
   if (PREDICT_TRUE(num_rows != -1)) {
     *count = num_rows;
   } else {
     shared_lock<rw_spinlock> l(component_lock_);
-    RETURN_NOT_OK(base_data_->CountRows(count));
+    RETURN_NOT_OK(base_data_->CountRows(io_context, count));
     num_rows_.store(*count);
   }
   return Status::OK();
@@ -845,17 +859,19 @@ Status DiskRowSet::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient
 
 Status DiskRowSet::InitUndoDeltas(Timestamp ancient_history_mark,
                                   MonoTime deadline,
+                                  const IOContext* io_context,
                                   int64_t* delta_blocks_initialized,
                                   int64_t* bytes_in_ancient_undos) {
   TRACE_EVENT0("tablet", "DiskRowSet::InitUndoDeltas");
-  return delta_tracker_->InitUndoDeltas(ancient_history_mark, deadline,
+  return delta_tracker_->InitUndoDeltas(ancient_history_mark, deadline, io_context,
                                         delta_blocks_initialized, bytes_in_ancient_undos);
 }
 
 Status DiskRowSet::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
+                                           const IOContext* io_context,
                                            int64_t* blocks_deleted, int64_t* bytes_deleted) {
   TRACE_EVENT0("tablet", "DiskRowSet::DeleteAncientUndoDeltas");
-  return delta_tracker_->DeleteAncientUndoDeltas(ancient_history_mark,
+  return delta_tracker_->DeleteAncientUndoDeltas(ancient_history_mark, io_context,
                                                  blocks_deleted, bytes_deleted);
 }
 
@@ -865,7 +881,7 @@ Status DiskRowSet::DebugDump(vector<string> *lines) {
   gscoped_ptr<CompactionInput> input;
   RETURN_NOT_OK(NewCompactionInput(&rowset_metadata_->tablet_schema(),
                                    MvccSnapshot::CreateSnapshotIncludingAllTransactions(),
-                                   &input));
+                                   nullptr, &input));
   return DebugDumpCompactionInput(input.get(), lines);
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/diskrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 67e587c..de2e7f6 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -68,6 +68,7 @@ class OpId;
 
 namespace fs {
 class BlockCreationTransaction;
+struct IOContext;
 }
 
 namespace log {
@@ -312,6 +313,7 @@ class DiskRowSet : public RowSet {
   static Status Open(const std::shared_ptr<RowSetMetadata>& rowset_metadata,
                      log::LogAnchorRegistry* log_anchor_registry,
                      const TabletMemTrackers& mem_trackers,
+                     const fs::IOContext* io_context,
                      std::shared_ptr<DiskRowSet> *rowset);
 
   ////////////////////////////////////////////////////////////
@@ -319,12 +321,12 @@ class DiskRowSet : public RowSet {
   ////////////////////////////////////////////////////////////
 
   // Flush all accumulated delta data to disk.
-  Status FlushDeltas() override;
+  Status FlushDeltas(const fs::IOContext* io_context) override;
 
   // Perform delta store minor compaction.
   // This compacts the delta files down to a single one.
   // If there is already only a single delta file, this does nothing.
-  Status MinorCompactDeltaStores() override;
+  Status MinorCompactDeltaStores(const fs::IOContext* io_context) override;
 
   ////////////////////////////////////////////////////////////
   // RowSet implementation
@@ -341,12 +343,12 @@ class DiskRowSet : public RowSet {
                    const RowSetKeyProbe &probe,
                    const RowChangeList &update,
                    const consensus::OpId& op_id,
+                   const fs::IOContext* io_context,
                    ProbeStats* stats,
                    OperationResultPB* result) override;
 
-  Status CheckRowPresent(const RowSetKeyProbe &probe,
-                         bool *present,
-                         ProbeStats* stats) const override;
+  Status CheckRowPresent(const RowSetKeyProbe &probe, const fs::IOContext* io_context,
+                         bool *present, ProbeStats* stats) const override;
 
   ////////////////////
   // Read functions.
@@ -356,11 +358,12 @@ class DiskRowSet : public RowSet {
 
   virtual Status NewCompactionInput(const Schema* projection,
                                     const MvccSnapshot &snap,
+                                    const fs::IOContext* io_context,
                                     gscoped_ptr<CompactionInput>* out) const override;
 
   // Gets the number of rows in this rowset, checking 'num_rows_' first. If not
   // yet set, consults the base data and stores the result in 'num_rows_'.
-  Status CountRows(rowid_t *count) const final override;
+  Status CountRows(const fs::IOContext* io_context, rowid_t *count) const final override;
 
   // See RowSet::GetBounds(...)
   virtual Status GetBounds(std::string* min_encoded_key,
@@ -389,14 +392,15 @@ class DiskRowSet : public RowSet {
 
   Status InitUndoDeltas(Timestamp ancient_history_mark,
                         MonoTime deadline,
+                        const fs::IOContext* io_context,
                         int64_t* delta_blocks_initialized,
                         int64_t* bytes_in_ancient_undos) override;
 
-  Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
+  Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark, const fs::IOContext* io_context,
                                  int64_t* blocks_deleted, int64_t* bytes_deleted) override;
 
   // Major compacts all the delta files for all the columns.
-  Status MajorCompactDeltaStores(HistoryGcOpts history_gc_opts);
+  Status MajorCompactDeltaStores(const fs::IOContext* io_context, HistoryGcOpts history_gc_opts);
 
   std::mutex *compact_flush_lock() override {
     return &compact_flush_lock_;
@@ -444,15 +448,17 @@ class DiskRowSet : public RowSet {
              log::LogAnchorRegistry* log_anchor_registry,
              TabletMemTrackers mem_trackers);
 
-  Status Open();
+  Status Open(const fs::IOContext* io_context);
 
   // Create a new major delta compaction object to compact the specified columns.
   Status NewMajorDeltaCompaction(const std::vector<ColumnId>& col_ids,
                                  HistoryGcOpts history_gc_opts,
+                                 const fs::IOContext* io_context,
                                  gscoped_ptr<MajorDeltaCompaction>* out) const;
 
   // Major compacts all the delta files for the specified columns.
   Status MajorCompactDeltaStoresWithColumnIds(const std::vector<ColumnId>& col_ids,
+                                              const fs::IOContext* io_context,
                                               HistoryGcOpts history_gc_opts);
 
   std::shared_ptr<RowSetMetadata> rowset_metadata_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/memrowset-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset-test.cc b/src/kudu/tablet/memrowset-test.cc
index ba5134e..1823f14 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -122,7 +122,7 @@ class TestMemRowSet : public KuduTest {
     RowSetKeyProbe probe(rb.row());
     ProbeStats stats;
 
-    return mrs.CheckRowPresent(probe, present, &stats);
+    return mrs.CheckRowPresent(probe, nullptr, present, &stats);
   }
 
   Status InsertRows(MemRowSet *mrs, int num_rows) {
@@ -169,6 +169,7 @@ class TestMemRowSet : public KuduTest {
                               probe,
                               RowChangeList(mutation_buf_),
                               op_id_,
+                              nullptr,
                               &stats,
                               result);
     tx.Commit();
@@ -191,6 +192,7 @@ class TestMemRowSet : public KuduTest {
                               probe,
                               RowChangeList(mutation_buf_),
                               op_id_,
+                              nullptr,
                               &stats,
                               result);
     tx.Commit();

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/memrowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index 0b68782..da107af 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -60,6 +60,7 @@ using std::vector;
 namespace kudu { namespace tablet {
 
 using consensus::OpId;
+using fs::IOContext;
 using log::LogAnchorRegistry;
 using strings::Substitute;
 
@@ -221,6 +222,7 @@ Status MemRowSet::MutateRow(Timestamp timestamp,
                             const RowSetKeyProbe &probe,
                             const RowChangeList &delta,
                             const consensus::OpId& op_id,
+                            const IOContext* /*io_context*/,
                             ProbeStats* stats,
                             OperationResultPB *result) {
   {
@@ -259,8 +261,8 @@ Status MemRowSet::MutateRow(Timestamp timestamp,
   return Status::OK();
 }
 
-Status MemRowSet::CheckRowPresent(const RowSetKeyProbe &probe, bool *present,
-                                  ProbeStats* stats) const {
+Status MemRowSet::CheckRowPresent(const RowSetKeyProbe &probe, const IOContext* /*io_context*/,
+                                  bool* present, ProbeStats* stats) const {
   // Use a PreparedMutation here even though we don't plan to mutate. Even though
   // this takes a lock rather than an optimistic copy, it should be a very short
   // critical section, and this call is only made on updates, which are rare.
@@ -307,6 +309,7 @@ Status MemRowSet::NewRowIterator(const RowIteratorOptions& opts,
 
 Status MemRowSet::NewCompactionInput(const Schema* projection,
                                      const MvccSnapshot& snap,
+                                     const IOContext* /*io_context*/,
                                      gscoped_ptr<CompactionInput>* out) const  {
   out->reset(CompactionInput::Create(*this, projection, snap));
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/memrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index 429f734..8b2998a 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -56,13 +56,17 @@ class RowChangeList;
 class ScanSpec;
 struct IteratorStats;
 
+namespace fs {
+struct IOContext;
+}  // namespace fs
+
 namespace tablet {
 class MvccSnapshot;
-} // namespace tablet
+}  // namespace tablet
 
 namespace consensus {
 class OpId;
-} // namespace consensus
+}  // namespace consensus
 
 namespace tablet {
 //
@@ -238,6 +242,7 @@ class MemRowSet : public RowSet,
                            const RowSetKeyProbe &probe,
                            const RowChangeList &delta,
                            const consensus::OpId& op_id,
+                           const fs::IOContext* io_context,
                            ProbeStats* stats,
                            OperationResultPB *result) override;
 
@@ -249,7 +254,7 @@ class MemRowSet : public RowSet,
   }
 
   // Conform entry_count to RowSet
-  Status CountRows(rowid_t *count) const override {
+  Status CountRows(const fs::IOContext* /*io_context*/, rowid_t *count) const override {
     *count = entry_count();
     return Status::OK();
   }
@@ -292,8 +297,8 @@ class MemRowSet : public RowSet,
   }
 
   // TODO(todd): unit test me
-  Status CheckRowPresent(const RowSetKeyProbe &probe, bool *present,
-                         ProbeStats* stats) const override;
+  Status CheckRowPresent(const RowSetKeyProbe &probe, const fs::IOContext* io_context,
+                         bool *present, ProbeStats* stats) const override;
 
   // Return the memory footprint of this memrowset.
   // Note that this may be larger than the sum of the data
@@ -320,6 +325,7 @@ class MemRowSet : public RowSet,
   // Create compaction input.
   virtual Status NewCompactionInput(const Schema* projection,
                                     const MvccSnapshot& snap,
+                                    const fs::IOContext* io_context,
                                     gscoped_ptr<CompactionInput>* out) const override;
 
   // Return the Schema for the rows in this memrowset.
@@ -385,6 +391,7 @@ class MemRowSet : public RowSet,
 
   Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
                         MonoTime /*deadline*/,
+                        const fs::IOContext* /*io_context*/,
                         int64_t* delta_blocks_initialized,
                         int64_t* bytes_in_ancient_undos) override {
     if (delta_blocks_initialized) *delta_blocks_initialized = 0;
@@ -393,15 +400,17 @@ class MemRowSet : public RowSet,
   }
 
   Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
+                                 const fs::IOContext* /*io_context*/,
                                  int64_t* blocks_deleted, int64_t* bytes_deleted) override {
     if (blocks_deleted) *blocks_deleted = 0;
     if (bytes_deleted) *bytes_deleted = 0;
     return Status::OK();
   }
 
-  Status FlushDeltas() override { return Status::OK(); }
+  Status FlushDeltas(const fs::IOContext* /*io_context*/) override { return Status::OK(); }
 
-  Status MinorCompactDeltaStores() override { return Status::OK(); }
+  Status MinorCompactDeltaStores(
+      const fs::IOContext* /*io_context*/) override { return Status::OK(); }
 
  private:
   friend class Iterator;

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/mock-rowsets.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index b83fbc5..4a65dfb 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -28,22 +28,29 @@
 #include "kudu/tablet/rowset_metadata.h"
 
 namespace kudu {
+
+namespace fs {
+struct IOContext;
+}  // namespace fs
+
 namespace tablet {
 
 // Mock implementation of RowSet which just aborts on every call.
 class MockRowSet : public RowSet {
  public:
-  virtual Status CheckRowPresent(const RowSetKeyProbe &probe, bool *present,
-                                 ProbeStats* stats) const OVERRIDE {
+  virtual Status CheckRowPresent(const RowSetKeyProbe& /*probe*/,
+                                 const fs::IOContext* /*io_context*/,
+                                 bool* /*present*/, ProbeStats* /*stats*/) const OVERRIDE {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status MutateRow(Timestamp timestamp,
-                           const RowSetKeyProbe &probe,
-                           const RowChangeList &update,
-                           const consensus::OpId& op_id_,
-                           ProbeStats* stats,
-                           OperationResultPB *result) OVERRIDE {
+  virtual Status MutateRow(Timestamp /*timestamp*/,
+                           const RowSetKeyProbe& /*probe*/,
+                           const RowChangeList& /*update*/,
+                           const consensus::OpId& /*op_id_*/,
+                           const fs::IOContext* /*io_context*/,
+                           ProbeStats* /*stats*/,
+                           OperationResultPB* /*result*/) OVERRIDE {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
@@ -52,13 +59,14 @@ class MockRowSet : public RowSet {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status NewCompactionInput(const Schema* projection,
-                                    const MvccSnapshot &snap,
-                                    gscoped_ptr<CompactionInput>* out) const OVERRIDE {
+  virtual Status NewCompactionInput(const Schema* /*projection*/,
+                                    const MvccSnapshot& /*snap*/,
+                                    const fs::IOContext* /*io_context*/,
+                                    gscoped_ptr<CompactionInput>* /*out*/) const OVERRIDE {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status CountRows(rowid_t *count) const OVERRIDE {
+  virtual Status CountRows(const fs::IOContext* /*io_context*/, rowid_t* /*count*/) const OVERRIDE {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
@@ -66,7 +74,7 @@ class MockRowSet : public RowSet {
     LOG(FATAL) << "Unimplemented";
     return "";
   }
-  virtual Status DebugDump(std::vector<std::string> *lines = NULL) OVERRIDE {
+  virtual Status DebugDump(std::vector<std::string>* /*lines*/) OVERRIDE {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
@@ -116,18 +124,18 @@ class MockRowSet : public RowSet {
     return -1;
   }
 
-  virtual double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType type)
+  virtual double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType /*type*/)
       const OVERRIDE {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
 
-  virtual Status FlushDeltas() OVERRIDE {
+  virtual Status FlushDeltas(const fs::IOContext* /*io_context*/) OVERRIDE {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
-  virtual Status MinorCompactDeltaStores() OVERRIDE {
+  virtual Status MinorCompactDeltaStores(const fs::IOContext* /*io_context*/) OVERRIDE {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
@@ -140,6 +148,7 @@ class MockRowSet : public RowSet {
 
   virtual Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
                                 MonoTime /*deadline*/,
+                                const fs::IOContext* /*io_context*/,
                                 int64_t* /*delta_blocks_initialized*/,
                                 int64_t* /*bytes_in_ancient_undos*/) OVERRIDE {
     LOG(FATAL) << "Unimplemented";
@@ -147,6 +156,7 @@ class MockRowSet : public RowSet {
   }
 
   virtual Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
+                                         const fs::IOContext* /*io_context*/,
                                          int64_t* /*blocks_deleted*/,
                                          int64_t* /*bytes_deleted*/) OVERRIDE {
     LOG(FATAL) << "Unimplemented";

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/mt-diskrowset-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mt-diskrowset-test.cc b/src/kudu/tablet/mt-diskrowset-test.cc
index 12906dd..7e494d8 100644
--- a/src/kudu/tablet/mt-diskrowset-test.cc
+++ b/src/kudu/tablet/mt-diskrowset-test.cc
@@ -50,7 +50,7 @@ class TestMultiThreadedRowSet : public TestRowSet {
 
   void FlushThread(DiskRowSet *rs) {
     for (int i = 0; i < 10; i++) {
-      CHECK_OK(rs->FlushDeltas());
+      CHECK_OK(rs->FlushDeltas(nullptr));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/mt-rowset_delta_compaction-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mt-rowset_delta_compaction-test.cc b/src/kudu/tablet/mt-rowset_delta_compaction-test.cc
index ad1acab..28e827e 100644
--- a/src/kudu/tablet/mt-rowset_delta_compaction-test.cc
+++ b/src/kudu/tablet/mt-rowset_delta_compaction-test.cc
@@ -93,7 +93,7 @@ class TestMultiThreadedRowSetDeltaCompaction : public TestRowSet {
   void RowSetFlushThread(DiskRowSet *rs) {
     while (ShouldRun()) {
       if (rs->CountDeltaStores() < 5) {
-        CHECK_OK(rs->FlushDeltas());
+        CHECK_OK(rs->FlushDeltas(nullptr));
       } else {
         SleepFor(MonoDelta::FromMilliseconds(10));
       }
@@ -102,7 +102,7 @@ class TestMultiThreadedRowSetDeltaCompaction : public TestRowSet {
 
   void RowSetDeltaCompactionThread(DiskRowSet *rs) {
     while (ShouldRun()) {
-      CHECK_OK(rs->MinorCompactDeltaStores());
+      CHECK_OK(rs->MinorCompactDeltaStores(nullptr));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/rowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc
index 4e94e6d..0dec136 100644
--- a/src/kudu/tablet/rowset.cc
+++ b/src/kudu/tablet/rowset.cc
@@ -36,6 +36,9 @@ using std::vector;
 using strings::Substitute;
 
 namespace kudu {
+
+using fs::IOContext;
+
 namespace tablet {
 
 RowIteratorOptions::RowIteratorOptions()
@@ -110,9 +113,10 @@ Status DuplicatingRowSet::NewRowIterator(const RowIteratorOptions& opts,
   return Status::OK();
 }
 
-Status DuplicatingRowSet::NewCompactionInput(const Schema* projection,
-                                             const MvccSnapshot &snap,
-                                             gscoped_ptr<CompactionInput>* out) const  {
+Status DuplicatingRowSet::NewCompactionInput(const Schema* /*projection*/,
+                                             const MvccSnapshot& /*snap*/,
+                                             const IOContext* /*io_context*/,
+                                             gscoped_ptr<CompactionInput>* /*out*/) const  {
   LOG(FATAL) << "duplicating rowsets do not act as compaction input";
   return Status::OK();
 }
@@ -122,6 +126,7 @@ Status DuplicatingRowSet::MutateRow(Timestamp timestamp,
                                     const RowSetKeyProbe &probe,
                                     const RowChangeList &update,
                                     const consensus::OpId& op_id,
+                                    const IOContext* io_context,
                                     ProbeStats* stats,
                                     OperationResultPB* result) {
   // Duplicate the update to both the relevant input rowset and the output rowset.
@@ -135,7 +140,7 @@ Status DuplicatingRowSet::MutateRow(Timestamp timestamp,
   // First mutate the relevant input rowset.
   bool updated = false;
   for (const shared_ptr<RowSet> &rowset : old_rowsets_) {
-    Status s = rowset->MutateRow(timestamp, probe, update, op_id, stats, result);
+    Status s = rowset->MutateRow(timestamp, probe, update, op_id, io_context, stats, result);
     if (s.ok()) {
       updated = true;
       break;
@@ -155,7 +160,7 @@ Status DuplicatingRowSet::MutateRow(Timestamp timestamp,
   // If it succeeded there, we also need to mirror into the new rowset.
   int mirrored_count = 0;
   for (const shared_ptr<RowSet> &new_rowset : new_rowsets_) {
-    Status s = new_rowset->MutateRow(timestamp, probe, update, op_id, stats, result);
+    Status s = new_rowset->MutateRow(timestamp, probe, update, op_id, io_context, stats, result);
     if (s.ok()) {
       mirrored_count++;
       #ifdef NDEBUG
@@ -176,11 +181,11 @@ Status DuplicatingRowSet::MutateRow(Timestamp timestamp,
   return Status::OK();
 }
 
-Status DuplicatingRowSet::CheckRowPresent(const RowSetKeyProbe &probe,
+Status DuplicatingRowSet::CheckRowPresent(const RowSetKeyProbe &probe, const IOContext* io_context,
                                           bool *present, ProbeStats* stats) const {
   *present = false;
   for (const shared_ptr<RowSet> &rowset : old_rowsets_) {
-    RETURN_NOT_OK(rowset->CheckRowPresent(probe, present, stats));
+    RETURN_NOT_OK(rowset->CheckRowPresent(probe, io_context, present, stats));
     if (*present) {
       return Status::OK();
     }
@@ -188,11 +193,11 @@ Status DuplicatingRowSet::CheckRowPresent(const RowSetKeyProbe &probe,
   return Status::OK();
 }
 
-Status DuplicatingRowSet::CountRows(rowid_t *count) const {
+Status DuplicatingRowSet::CountRows(const IOContext* io_context, rowid_t *count) const {
   int64_t accumulated_count = 0;
   for (const shared_ptr<RowSet> &rs : new_rowsets_) {
     rowid_t this_count;
-    RETURN_NOT_OK(rs->CountRows(&this_count));
+    RETURN_NOT_OK(rs->CountRows(io_context, &this_count));
     accumulated_count += this_count;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/rowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index 47ddf82..1d1c36d 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -53,6 +53,10 @@ namespace consensus {
 class OpId;
 }
 
+namespace fs {
+struct IOContext;
+}
+
 namespace tablet {
 
 class CompactionInput;
@@ -88,6 +92,11 @@ struct RowIteratorOptions {
   // Defaults to UNORDERED.
   OrderMode order;
 
+  // Context of IO.
+  //
+  // Defaults to nullptr.
+  const fs::IOContext* io_context;
+
   // Whether iteration should include rows whose last mutation was a DELETE.
   //
   // Defaults to false.
@@ -108,8 +117,8 @@ class RowSet {
   // If the row was once present in this rowset, but no longer present
   // due to a DELETE, then this should set *present = false, as if
   // it were never there.
-  virtual Status CheckRowPresent(const RowSetKeyProbe &probe, bool *present,
-                                 ProbeStats* stats) const = 0;
+  virtual Status CheckRowPresent(const RowSetKeyProbe &probe, const fs::IOContext* io_context,
+                                 bool *present, ProbeStats* stats) const = 0;
 
   // Update/delete a row in this rowset.
   // The 'update_schema' is the client schema used to encode the 'update' RowChangeList.
@@ -120,6 +129,7 @@ class RowSet {
                            const RowSetKeyProbe &probe,
                            const RowChangeList &update,
                            const consensus::OpId& op_id,
+                           const fs::IOContext* io_context,
                            ProbeStats* stats,
                            OperationResultPB* result) = 0;
 
@@ -140,10 +150,11 @@ class RowSet {
   // will be projected into this Schema.
   virtual Status NewCompactionInput(const Schema* projection,
                                     const MvccSnapshot &snap,
+                                    const fs::IOContext* io_context,
                                     gscoped_ptr<CompactionInput>* out) const = 0;
 
   // Count the number of rows in this rowset.
-  virtual Status CountRows(rowid_t *count) const = 0;
+  virtual Status CountRows(const fs::IOContext* io_context, rowid_t *count) const = 0;
 
   // Return the bounds for this RowSet. 'min_encoded_key' and 'max_encoded_key'
   // are set to the first and last encoded keys for this RowSet.
@@ -192,10 +203,10 @@ class RowSet {
   virtual double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType type) const = 0;
 
   // Flush the DMS if there's one
-  virtual Status FlushDeltas() = 0;
+  virtual Status FlushDeltas(const fs::IOContext* io_context) = 0;
 
   // Compact delta stores if more than one.
-  virtual Status MinorCompactDeltaStores() = 0;
+  virtual Status MinorCompactDeltaStores(const fs::IOContext* io_context) = 0;
 
   // Estimate the number of bytes in ancient undo delta stores. This may be an
   // overestimate. The argument 'ancient_history_mark' must be valid (it may
@@ -223,6 +234,7 @@ class RowSet {
   // may be passed in as nullptr.
   virtual Status InitUndoDeltas(Timestamp ancient_history_mark,
                                 MonoTime deadline,
+                                const fs::IOContext* io_context,
                                 int64_t* delta_blocks_initialized,
                                 int64_t* bytes_in_ancient_undos) = 0;
 
@@ -244,6 +256,7 @@ class RowSet {
   // The out-parameters, 'blocks_deleted' and 'bytes_deleted', may be passed in
   // as nullptr.
   virtual Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
+                                         const fs::IOContext* io_context,
                                          int64_t* blocks_deleted,
                                          int64_t* bytes_deleted) = 0;
 
@@ -366,20 +379,22 @@ class DuplicatingRowSet : public RowSet {
                            const RowSetKeyProbe &probe,
                            const RowChangeList &update,
                            const consensus::OpId& op_id,
+                           const fs::IOContext* io_context,
                            ProbeStats* stats,
                            OperationResultPB* result) OVERRIDE;
 
-  Status CheckRowPresent(const RowSetKeyProbe &probe, bool *present,
-                         ProbeStats* stats) const OVERRIDE;
+  Status CheckRowPresent(const RowSetKeyProbe &probe, const fs::IOContext* io_context,
+                         bool *present, ProbeStats* stats) const OVERRIDE;
 
   virtual Status NewRowIterator(const RowIteratorOptions& opts,
                                 gscoped_ptr<RowwiseIterator>* out) const OVERRIDE;
 
   virtual Status NewCompactionInput(const Schema* projection,
                                     const MvccSnapshot &snap,
+                                    const fs::IOContext* io_context,
                                     gscoped_ptr<CompactionInput>* out) const OVERRIDE;
 
-  Status CountRows(rowid_t *count) const OVERRIDE;
+  Status CountRows(const fs::IOContext* io_context, rowid_t *count) const OVERRIDE;
 
   virtual Status GetBounds(std::string* min_encoded_key,
                            std::string* max_encoded_key) const OVERRIDE;
@@ -429,7 +444,7 @@ class DuplicatingRowSet : public RowSet {
 
   int64_t MinUnflushedLogIndex() const OVERRIDE { return -1; }
 
-  Status FlushDeltas() OVERRIDE {
+  Status FlushDeltas(const fs::IOContext* /*io_context*/) OVERRIDE {
     // It's important that DuplicatingRowSet does not FlushDeltas. This prevents
     // a bug where we might end up with out-of-order deltas. See the long
     // comment in Tablet::Flush(...)
@@ -445,6 +460,7 @@ class DuplicatingRowSet : public RowSet {
 
   Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
                         MonoTime /*deadline*/,
+                        const fs::IOContext* /*io_context*/,
                         int64_t* delta_blocks_initialized,
                         int64_t* bytes_in_ancient_undos) OVERRIDE {
     if (delta_blocks_initialized) *delta_blocks_initialized = 0;
@@ -453,13 +469,15 @@ class DuplicatingRowSet : public RowSet {
   }
 
   Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
+                                 const fs::IOContext* /*io_context*/,
                                  int64_t* blocks_deleted, int64_t* bytes_deleted) OVERRIDE {
     if (blocks_deleted) *blocks_deleted = 0;
     if (bytes_deleted) *bytes_deleted = 0;
     return Status::OK();
   }
 
-  Status MinorCompactDeltaStores() OVERRIDE { return Status::OK(); }
+  Status MinorCompactDeltaStores(
+      const fs::IOContext* /*io_context*/) OVERRIDE { return Status::OK(); }
 
  private:
   friend class Tablet;

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 6fe3e96..0c57724 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -51,6 +51,7 @@
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/fs/io_context.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/bind_helpers.h"
 #include "kudu/gutil/casts.h"
@@ -172,6 +173,7 @@ METRIC_DEFINE_gauge_size(tablet, on_disk_data_size, "Tablet Data Size On Disk",
 
 using kudu::MaintenanceManager;
 using kudu::clock::HybridClock;
+using kudu::fs::IOContext;
 using kudu::log::LogAnchorRegistry;
 using std::ostream;
 using std::pair;
@@ -267,12 +269,14 @@ Status Tablet::Open() {
 
   RowSetVector rowsets_opened;
 
+  fs::IOContext io_context({ tablet_id() });
   // open the tablet row-sets
   for (const shared_ptr<RowSetMetadata>& rowset_meta : metadata_->rowsets()) {
     shared_ptr<DiskRowSet> rowset;
     Status s = DiskRowSet::Open(rowset_meta,
                                 log_anchor_registry_.get(),
                                 mem_trackers_,
+                                &io_context,
                                 &rowset);
     if (!s.ok()) {
       LOG_WITH_PREFIX(ERROR) << "Failed to open rowset " << rowset_meta->ToString() << ": "
@@ -379,8 +383,9 @@ Status Tablet::NewRowIterator(const Schema &projection,
   if (metrics_) {
     metrics_->scans_started->Increment();
   }
+  IOContext io_context({ tablet_id() });
   VLOG_WITH_PREFIX(2) << "Created new Iterator under snap: " << snap.ToString();
-  iter->reset(new Iterator(this, projection, snap, order));
+  iter->reset(new Iterator(this, projection, snap, order, std::move(io_context)));
   return Status::OK();
 }
 
@@ -570,7 +575,8 @@ Status Tablet::ValidateMutateUnlocked(const RowOp& op) const {
   return Status::OK();
 }
 
-Status Tablet::InsertOrUpsertUnlocked(WriteTransactionState *tx_state,
+Status Tablet::InsertOrUpsertUnlocked(const IOContext* io_context,
+                                      WriteTransactionState *tx_state,
                                       RowOp* op,
                                       ProbeStats* stats) {
   DCHECK(op->checked_present);
@@ -581,7 +587,7 @@ Status Tablet::InsertOrUpsertUnlocked(WriteTransactionState *tx_state,
 
   if (op->present_in_rowset) {
     if (is_upsert) {
-      return ApplyUpsertAsUpdate(tx_state, op, op->present_in_rowset, stats);
+      return ApplyUpsertAsUpdate(io_context, tx_state, op, op->present_in_rowset, stats);
     }
     Status s = Status::AlreadyPresent("key already present");
     if (metrics_) {
@@ -605,7 +611,7 @@ Status Tablet::InsertOrUpsertUnlocked(WriteTransactionState *tx_state,
   } else {
     if (s.IsAlreadyPresent()) {
       if (is_upsert) {
-        return ApplyUpsertAsUpdate(tx_state, op, comps->memrowset.get(), stats);
+        return ApplyUpsertAsUpdate(io_context, tx_state, op, comps->memrowset.get(), stats);
       }
       if (metrics_) {
         metrics_->insertions_failed_dup_key->Increment();
@@ -616,7 +622,8 @@ Status Tablet::InsertOrUpsertUnlocked(WriteTransactionState *tx_state,
   return s;
 }
 
-Status Tablet::ApplyUpsertAsUpdate(WriteTransactionState* tx_state,
+Status Tablet::ApplyUpsertAsUpdate(const IOContext* io_context,
+                                   WriteTransactionState* tx_state,
                                    RowOp* upsert,
                                    RowSet* rowset,
                                    ProbeStats* stats) {
@@ -652,6 +659,7 @@ Status Tablet::ApplyUpsertAsUpdate(WriteTransactionState* tx_state,
                                *upsert->key_probe,
                                rcl,
                                tx_state->op_id(),
+                               io_context,
                                stats,
                                result.get());
   CHECK(!s.IsNotFound());
@@ -706,7 +714,8 @@ vector<RowSet*> Tablet::FindRowSetsToCheck(const RowOp* op,
   return to_check;
 }
 
-Status Tablet::MutateRowUnlocked(WriteTransactionState *tx_state,
+Status Tablet::MutateRowUnlocked(const IOContext* io_context,
+                                 WriteTransactionState *tx_state,
                                  RowOp* mutate,
                                  ProbeStats* stats) {
   DCHECK(mutate->checked_present);
@@ -724,6 +733,7 @@ Status Tablet::MutateRowUnlocked(WriteTransactionState *tx_state,
                                       *mutate->key_probe,
                                       mutate->decoded_op.changelist,
                                       tx_state->op_id(),
+                                      io_context,
                                       stats,
                                       result.get());
   if (PREDICT_TRUE(s.ok())) {
@@ -744,7 +754,7 @@ void Tablet::StartApplying(WriteTransactionState* tx_state) {
   tx_state->set_tablet_components(components_);
 }
 
-Status Tablet::BulkCheckPresence(WriteTransactionState* tx_state) {
+Status Tablet::BulkCheckPresence(const IOContext* io_context, WriteTransactionState* tx_state) {
   int num_ops = tx_state->row_ops().size();
 
   // TODO(todd) determine why we sometimes get empty writes!
@@ -834,7 +844,8 @@ Status Tablet::BulkCheckPresence(WriteTransactionState* tx_state) {
       }
 
       bool present = false;
-      s = rs->CheckRowPresent(*op->key_probe, &present, tx_state->mutable_op_stats(op_idx));
+      s = rs->CheckRowPresent(*op->key_probe, io_context,
+                              &present, tx_state->mutable_op_stats(op_idx));
       if (PREDICT_FALSE(!s.ok())) {
         LOG(WARNING) << Substitute("Tablet $0 failed to check row presence for op $1: $2",
             tablet_id(), op->ToString(key_schema_), s.ToString());
@@ -892,14 +903,16 @@ Status Tablet::ApplyRowOperations(WriteTransactionState* tx_state) {
     ValidateOpOrMarkFailed(op);
   }
 
-  RETURN_NOT_OK(BulkCheckPresence(tx_state));
+  IOContext io_context({ tablet_id() });
+  RETURN_NOT_OK(BulkCheckPresence(&io_context, tx_state));
 
   // Actually apply the ops.
   for (int op_idx = 0; op_idx < num_ops; op_idx++) {
     RowOp* row_op = tx_state->row_ops()[op_idx];
     if (row_op->has_result()) continue;
 
-    RETURN_NOT_OK(ApplyRowOperation(tx_state, row_op, tx_state->mutable_op_stats(op_idx)));
+    RETURN_NOT_OK(ApplyRowOperation(&io_context, tx_state, row_op,
+                                    tx_state->mutable_op_stats(op_idx)));
     DCHECK(row_op->has_result());
   }
 
@@ -909,7 +922,8 @@ Status Tablet::ApplyRowOperations(WriteTransactionState* tx_state) {
   return Status::OK();
 }
 
-Status Tablet::ApplyRowOperation(WriteTransactionState* tx_state,
+Status Tablet::ApplyRowOperation(const IOContext* io_context,
+                                 WriteTransactionState* tx_state,
                                  RowOp* row_op,
                                  ProbeStats* stats) {
   {
@@ -933,7 +947,8 @@ Status Tablet::ApplyRowOperation(WriteTransactionState* tx_state,
     vector<RowSet *> to_check = FindRowSetsToCheck(row_op, tx_state->tablet_components());
     for (RowSet *rowset : to_check) {
       bool present = false;
-      RETURN_NOT_OK_PREPEND(rowset->CheckRowPresent(*row_op->key_probe, &present, stats),
+      RETURN_NOT_OK_PREPEND(rowset->CheckRowPresent(*row_op->key_probe, io_context,
+                                                    &present, stats),
           "Failed to check if row is present");
       if (present) {
         row_op->present_in_rowset = rowset;
@@ -947,7 +962,7 @@ Status Tablet::ApplyRowOperation(WriteTransactionState* tx_state,
   switch (row_op->decoded_op.type) {
     case RowOperationsPB::INSERT:
     case RowOperationsPB::UPSERT:
-      s = InsertOrUpsertUnlocked(tx_state, row_op, stats);
+      s = InsertOrUpsertUnlocked(io_context, tx_state, row_op, stats);
       if (s.IsAlreadyPresent()) {
         return Status::OK();
       }
@@ -955,7 +970,7 @@ Status Tablet::ApplyRowOperation(WriteTransactionState* tx_state,
 
     case RowOperationsPB::UPDATE:
     case RowOperationsPB::DELETE:
-      s = MutateRowUnlocked(tx_state, row_op, stats);
+      s = MutateRowUnlocked(io_context, tx_state, row_op, stats);
       if (s.IsNotFound()) {
         return Status::OK();
       }
@@ -1023,7 +1038,7 @@ Status Tablet::DoMajorDeltaCompaction(const vector<ColumnId>& col_ids,
                                       const shared_ptr<RowSet>& input_rs) {
   RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
   Status s = down_cast<DiskRowSet*>(input_rs.get())
-      ->MajorCompactDeltaStoresWithColumnIds(col_ids, GetHistoryGcOpts());
+      ->MajorCompactDeltaStoresWithColumnIds(col_ids, nullptr, GetHistoryGcOpts());
   return s;
 }
 
@@ -1439,6 +1454,8 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
                "tablet_id", tablet_id(),
                "op", op_name);
 
+  const IOContext io_context({ tablet_id() });
+
   MvccSnapshot flush_snap(mvcc_);
   LOG_WITH_PREFIX(INFO) << op_name << ": entering phase 1 (flushing snapshot). Phase 1 snapshot: "
                         << flush_snap.ToString();
@@ -1449,7 +1466,7 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
   }
 
   shared_ptr<CompactionInput> merge;
-  RETURN_NOT_OK(input.CreateCompactionInput(flush_snap, schema(), &merge));
+  RETURN_NOT_OK(input.CreateCompactionInput(flush_snap, schema(), &io_context, &merge));
 
   RollingDiskRowSetWriter drsw(metadata_.get(), merge->schema(), DefaultBloomSizing(),
                                compaction_policy_->target_rowset_size());
@@ -1489,6 +1506,7 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
       Status s = DiskRowSet::Open(meta,
                                   log_anchor_registry_.get(),
                                   mem_trackers_,
+                                  &io_context,
                                   &new_rowset);
       if (!s.ok()) {
         LOG_WITH_PREFIX(WARNING) << "Unable to open snapshot " << op_name << " results "
@@ -1584,14 +1602,14 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
                         << " Phase 2: carrying over any updates which arrived during Phase 1";
   LOG_WITH_PREFIX(INFO) << "Phase 2 snapshot: " << non_duplicated_txns_snap.ToString();
   RETURN_NOT_OK_PREPEND(
-      input.CreateCompactionInput(non_duplicated_txns_snap, schema(), &merge),
+      input.CreateCompactionInput(non_duplicated_txns_snap, schema(), &io_context, &merge),
           Substitute("Failed to create $0 inputs", op_name).c_str());
 
   // Update the output rowsets with the deltas that came in in phase 1, before we swapped
   // in the DuplicatingRowSets. This will perform a flush of the updated DeltaTrackers
   // in the end so that the data that is reported in the log as belonging to the input
   // rowsets is flushed.
-  RETURN_NOT_OK_PREPEND(ReupdateMissedDeltas(metadata_->tablet_id(),
+  RETURN_NOT_OK_PREPEND(ReupdateMissedDeltas(&io_context,
                                              merge.get(),
                                              history_gc_opts,
                                              flush_snap,
@@ -1728,6 +1746,7 @@ Status Tablet::CaptureConsistentIterators(
     const MvccSnapshot& snap,
     const ScanSpec* spec,
     OrderMode order,
+    const IOContext* io_context,
     vector<shared_ptr<RowwiseIterator>>* iters) const {
 
   shared_lock<rw_spinlock> l(component_lock_);
@@ -1747,12 +1766,14 @@ Status Tablet::CaptureConsistentIterators(
   RETURN_NOT_OK(components_->memrowset->NewRowIterator(opts, &ms_iter));
   ret.emplace_back(ms_iter.release());
 
+  opts.io_context = io_context;
+
   // Cull row-sets in the case of key-range queries.
   if (spec != nullptr && spec->lower_bound_key() && spec->exclusive_upper_bound_key()) {
-    // TODO : support open-ended intervals
-    // TODO: the upper bound key is exclusive, but the RowSetTree function takes
-    // an inclusive interval. So, we might end up fetching one more rowset than
-    // necessary.
+    // TODO(todd): support open-ended intervals
+    // TODO(todd): the upper bound key is exclusive, but the RowSetTree
+    // function takes an inclusive interval. So, we might end up fetching one
+    // more rowset than necessary.
     vector<RowSet *> interval_sets;
     components_->rowsets->FindRowSetsIntersectingInterval(
         spec->lower_bound_key()->encoded_key(),
@@ -1790,10 +1811,11 @@ Status Tablet::CountRows(uint64_t *count) const {
   GetComponents(&comps);
 
   // Now sum up the counts.
+  IOContext io_context({ tablet_id() });
   *count = comps->memrowset->entry_count();
   for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) {
     rowid_t l_count;
-    RETURN_NOT_OK(rowset->CountRows(&l_count));
+    RETURN_NOT_OK(rowset->CountRows(&io_context, &l_count));
     *count += l_count;
   }
 
@@ -1894,7 +1916,8 @@ Status Tablet::FlushBestDMS(const ReplaySizeMap &replay_size_map) const {
   RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
   shared_ptr<RowSet> rowset = FindBestDMSToFlush(replay_size_map);
   if (rowset) {
-    return rowset->FlushDeltas();
+    IOContext io_context({ tablet_id() });
+    return rowset->FlushDeltas(&io_context);
   }
   return Status::OK();
 }
@@ -1959,7 +1982,7 @@ Status Tablet::FlushBiggestDMS() {
       biggest_drs = rowset;
     }
   }
-  return max_size > 0 ? biggest_drs->FlushDeltas() : Status::OK();
+  return max_size > 0 ? biggest_drs->FlushDeltas(nullptr) : Status::OK();
 }
 
 Status Tablet::FlushAllDMSForTests() {
@@ -1967,7 +1990,7 @@ Status Tablet::FlushAllDMSForTests() {
   scoped_refptr<TabletComponents> comps;
   GetComponents(&comps);
   for (const auto& rowset : comps->rowsets->all_rowsets()) {
-    RETURN_NOT_OK(rowset->FlushDeltas());
+    RETURN_NOT_OK(rowset->FlushDeltas(nullptr));
   }
   return Status::OK();
 }
@@ -1977,12 +2000,13 @@ Status Tablet::MajorCompactAllDeltaStoresForTests() {
   RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
   scoped_refptr<TabletComponents> comps;
   GetComponents(&comps);
+  IOContext io_context({ tablet_id() });
   for (const auto& rs : comps->rowsets->all_rowsets()) {
     if (!rs->IsAvailableForCompaction()) continue;
     DiskRowSet* drs = down_cast<DiskRowSet*>(rs.get());
     RETURN_NOT_OK(drs->delta_tracker()->InitAllDeltaStoresForTests(DeltaTracker::REDOS_ONLY));
-    RETURN_NOT_OK_PREPEND(drs->MajorCompactDeltaStores(
-        GetHistoryGcOpts()), "Failed major delta compaction on " + rs->ToString());
+    RETURN_NOT_OK_PREPEND(drs->MajorCompactDeltaStores(&io_context, GetHistoryGcOpts()),
+                          "Failed major delta compaction on " + rs->ToString());
   }
   return Status::OK();
 }
@@ -2010,12 +2034,14 @@ Status Tablet::CompactWorstDeltas(RowSet::DeltaCompactionType type) {
   // We just released compact_select_lock_ so other compactions can select and run, but the
   // rowset is ours.
   DCHECK(perf_improv != 0);
+  IOContext io_context({ tablet_id() });
   if (type == RowSet::MINOR_DELTA_COMPACTION) {
-    RETURN_NOT_OK_PREPEND(rs->MinorCompactDeltaStores(),
+    RETURN_NOT_OK_PREPEND(rs->MinorCompactDeltaStores(&io_context),
                           "Failed minor delta compaction on " + rs->ToString());
   } else if (type == RowSet::MAJOR_DELTA_COMPACTION) {
-    RETURN_NOT_OK_PREPEND(down_cast<DiskRowSet*>(rs.get())->MajorCompactDeltaStores(
-        GetHistoryGcOpts()), "Failed major delta compaction on " + rs->ToString());
+    RETURN_NOT_OK_PREPEND(
+        down_cast<DiskRowSet*>(rs.get())->MajorCompactDeltaStores(&io_context, GetHistoryGcOpts()),
+        "Failed major delta compaction on " + rs->ToString());
   }
   return Status::OK();
 }
@@ -2079,6 +2105,7 @@ Status Tablet::EstimateBytesInPotentiallyAncientUndoDeltas(int64_t* bytes) {
 Status Tablet::InitAncientUndoDeltas(MonoDelta time_budget, int64_t* bytes_in_ancient_undos) {
   MonoTime tablet_init_start = MonoTime::Now();
 
+  IOContext io_context({ tablet_id() });
   Timestamp ancient_history_mark;
   if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) {
     VLOG_WITH_PREFIX(1) << "Cannot get ancient history mark. "
@@ -2119,7 +2146,7 @@ Status Tablet::InitAncientUndoDeltas(MonoDelta time_budget, int64_t* bytes_in_an
     const auto& rowset = rowsets[index];
     int64_t rowset_blocks_initialized;
     int64_t rowset_bytes_in_ancient_undos;
-    RETURN_NOT_OK(rowset->InitUndoDeltas(ancient_history_mark, deadline,
+    RETURN_NOT_OK(rowset->InitUndoDeltas(ancient_history_mark, deadline, &io_context,
                                          &rowset_blocks_initialized,
                                          &rowset_bytes_in_ancient_undos));
     tablet_bytes_in_ancient_undos += rowset_bytes_in_ancient_undos;
@@ -2167,10 +2194,11 @@ Status Tablet::DeleteAncientUndoDeltas(int64_t* blocks_deleted, int64_t* bytes_d
 
   int64_t tablet_blocks_deleted = 0;
   int64_t tablet_bytes_deleted = 0;
+  fs::IOContext io_context({ tablet_id() });
   for (const auto& rowset : rowsets_to_gc_undos) {
     int64_t rowset_blocks_deleted;
     int64_t rowset_bytes_deleted;
-    RETURN_NOT_OK(rowset->DeleteAncientUndoDeltas(ancient_history_mark,
+    RETURN_NOT_OK(rowset->DeleteAncientUndoDeltas(ancient_history_mark, &io_context,
                                                   &rowset_blocks_deleted, &rowset_bytes_deleted));
     tablet_blocks_deleted += rowset_blocks_deleted;
     tablet_bytes_deleted += rowset_bytes_deleted;
@@ -2269,11 +2297,13 @@ string Tablet::LogPrefix() const {
 ////////////////////////////////////////////////////////////
 
 Tablet::Iterator::Iterator(const Tablet* tablet, const Schema& projection,
-                           MvccSnapshot snap, const OrderMode order)
+                           MvccSnapshot snap, OrderMode order,
+                           IOContext io_context)
     : tablet_(tablet),
       projection_(projection),
       snap_(std::move(snap)),
-      order_(order) {}
+      order_(order),
+      io_context_(std::move(io_context)) {}
 
 Tablet::Iterator::~Iterator() {}
 
@@ -2285,7 +2315,8 @@ Status Tablet::Iterator::Init(ScanSpec *spec) {
 
   vector<shared_ptr<RowwiseIterator>> iters;
 
-  RETURN_NOT_OK(tablet_->CaptureConsistentIterators(&projection_, snap_, spec, order_, &iters));
+  RETURN_NOT_OK(tablet_->CaptureConsistentIterators(&projection_, snap_, spec, order_,
+                                                    &io_context_, &iters));
 
   switch (order_) {
     case ORDERED:

http://git-wip-us.apache.org/repos/asf/kudu/blob/2974f5a5/src/kudu/tablet/tablet.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 577b5c7..af6797c 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -34,6 +34,7 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/iterator.h"
 #include "kudu/common/schema.h"
+#include "kudu/fs/io_context.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
@@ -181,7 +182,8 @@ class Tablet {
 
   // Apply a single row operation, which must already be prepared.
   // The result is set back into row_op->result.
-  Status ApplyRowOperation(WriteTransactionState* tx_state,
+  Status ApplyRowOperation(const fs::IOContext* io_context,
+                           WriteTransactionState* tx_state,
                            RowOp* row_op,
                            ProbeStats* stats) WARN_UNUSED_RESULT;
 
@@ -370,6 +372,8 @@ class Tablet {
   // Runs a major delta major compaction on columns with specified IDs.
   // NOTE: RowSet must presently be a DiskRowSet. (Perhaps the API should be
   // a shared_ptr API for now?)
+  //
+  // Only used in tests.
   Status DoMajorDeltaCompaction(const std::vector<ColumnId>& col_ids,
                                 const std::shared_ptr<RowSet>& input_rs);
 
@@ -511,18 +515,21 @@ class Tablet {
   // - the row lock is acquired
   // - the tablet components have been acquired
   // - the operation has been decoded
-  Status InsertOrUpsertUnlocked(WriteTransactionState *tx_state,
+  Status InsertOrUpsertUnlocked(const fs::IOContext* io_context,
+                                WriteTransactionState *tx_state,
                                 RowOp* op,
                                 ProbeStats* stats);
 
   // Same as above, but for UPDATE.
-  Status MutateRowUnlocked(WriteTransactionState *tx_state,
+  Status MutateRowUnlocked(const fs::IOContext* io_context,
+                           WriteTransactionState *tx_state,
                            RowOp* mutate,
                            ProbeStats* stats);
 
   // In the case of an UPSERT against a duplicate row, converts the UPSERT
   // into an internal UPDATE operation and performs it.
-  Status ApplyUpsertAsUpdate(WriteTransactionState *tx_state,
+  Status ApplyUpsertAsUpdate(const fs::IOContext* io_context,
+                             WriteTransactionState *tx_state,
                              RowOp* upsert,
                              RowSet* rowset,
                              ProbeStats* stats);
@@ -535,7 +542,8 @@ class Tablet {
   // For each of the operations in 'tx_state', check for the presence of their
   // row keys in the RowSets in the current RowSetTree (as determined by the transaction's
   // captured TabletComponents).
-  Status BulkCheckPresence(WriteTransactionState* tx_state) WARN_UNUSED_RESULT;
+  Status BulkCheckPresence(const fs::IOContext* io_context,
+                           WriteTransactionState* tx_state) WARN_UNUSED_RESULT;
 
   // Capture a set of iterators which, together, reflect all of the data in the tablet.
   //
@@ -545,11 +553,12 @@ class Tablet {
   //
   // The returned iterators are not Init()ed.
   // 'projection' must remain valid and unchanged for the lifetime of the returned iterators.
-  Status CaptureConsistentIterators(const Schema *projection,
-                                    const MvccSnapshot &snap,
-                                    const ScanSpec *spec,
+  Status CaptureConsistentIterators(const Schema* projection,
+                                    const MvccSnapshot& snap,
+                                    const ScanSpec* spec,
                                     OrderMode order,
-                                    std::vector<std::shared_ptr<RowwiseIterator> > *iters) const;
+                                    const fs::IOContext* io_context,
+                                    std::vector<std::shared_ptr<RowwiseIterator> >* iters) const;
 
   Status PickRowSetsToCompact(RowSetsInCompaction *picked,
                               CompactFlags flags) const;
@@ -762,12 +771,13 @@ class Tablet::Iterator : public RowwiseIterator {
   DISALLOW_COPY_AND_ASSIGN(Iterator);
 
   Iterator(const Tablet* tablet, const Schema& projection, MvccSnapshot snap,
-           const OrderMode order);
+           OrderMode order, fs::IOContext io_context);
 
   const Tablet *tablet_;
   Schema projection_;
   const MvccSnapshot snap_;
   const OrderMode order_;
+  const fs::IOContext io_context_;
   gscoped_ptr<RowwiseIterator> iter_;
 };
 


Mime
View raw message