kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject kudu git commit: tablet: clean up MergeIterState API
Date Wed, 09 Jan 2019 07:03:56 GMT
Repository: kudu
Updated Branches:
  refs/heads/master b63d9219e -> 920c21aa1


tablet: clean up MergeIterState API

This patch simply adds documentation and some readability cleanup to
MergeIterState, which is a helper class for the MergeIterator.

Non-API changes:
 - Add API docs to all public methods.
 - Extract the implementation of Advance() and PullNextBlock() out of
   the class declaration to make the API contract easier to quickly read
   and understand.
 - Rename a couple of private variables to clarify row-oriented
 semantics.

API cleanup:
 - Add a public Init() method that delegates to PullNextBlock().
 - Make PullNextBlock() and IsBlockExhausted() private methods.
 - Replace exposure of the underlying iterator via iter() with a public
   AddStats() method, since stats collection was the only use of iter().

There are no functional changes in this patch.

Change-Id: Ie3f821dc06ddbe3f7ef018eec15b1993cde7e7e0
Reviewed-on: http://gerrit.cloudera.org:8080/12176
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/920c21aa
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/920c21aa
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/920c21aa

Branch: refs/heads/master
Commit: 920c21aa123feb04b40df4d310f469678d26a8c4
Parents: b63d921
Author: Mike Percy <mpercy@apache.org>
Authored: Mon Jan 7 16:46:42 2019 -0800
Committer: Adar Dembo <adar@cloudera.com>
Committed: Wed Jan 9 07:03:42 2019 +0000

----------------------------------------------------------------------
 src/kudu/common/generic_iterators.cc | 153 +++++++++++++++++-------------
 1 file changed, 89 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/920c21aa/src/kudu/common/generic_iterators.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index 4ac37d4..eb6ebe6 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -78,7 +78,7 @@ void AddIterStats(const RowwiseIterator& iter,
 // Merge iterator
 ////////////////////////////////////////////////////////////
 
-// TODO: size by bytes, not # rows
+// TODO(todd): this should be sized by # bytes, not # rows.
 static const int kMergeRowBuffer = 1000;
 
 // MergeIterState wraps a RowwiseIterator for use by the MergeIterator.
@@ -91,93 +91,118 @@ class MergeIterState {
       arena_(1024),
       read_block_(iter_->schema(), kMergeRowBuffer, &arena_),
       next_row_idx_(0),
-      num_advanced_(0),
-      num_valid_(0)
+      rows_advanced_(0),
+      rows_valid_(0)
   {}
 
-  const RowBlockRow& next_row() {
-    DCHECK_LT(num_advanced_, num_valid_);
+  // Fetch the next row from the iterator. Does not advance the iterator.
+  // IsFullyExhausted() must return false at the time this method is invoked.
+  const RowBlockRow& next_row() const {
+    DCHECK_LT(rows_advanced_, rows_valid_);
     return next_row_;
   }
 
-  Status Advance() {
-    num_advanced_++;
-    if (IsBlockExhausted()) {
-      arena_.Reset();
-      return PullNextBlock();
-    } else {
-      // Seek to the next selected row.
-      SelectionVector *selection = read_block_.selection_vector();
-      for (++next_row_idx_; next_row_idx_ < read_block_.nrows(); next_row_idx_++) {
-        if (selection->IsRowSelected(next_row_idx_)) {
-          next_row_.Reset(&read_block_, next_row_idx_);
-          break;
-        }
-      }
-      DCHECK_NE(next_row_idx_, read_block_.nrows()+1) << "No selected rows found!";
-      return Status::OK();
-    }
-  }
-
-  bool IsBlockExhausted() const {
-    return num_advanced_ == num_valid_;
+  // Initialize the underlying iterator to point to the first valid row, if
+  // any. This method should be called before calling any other methods.
+  Status Init() {
+    CHECK_EQ(0, rows_valid_);
+    return PullNextBlock();
   }
 
+  // Returns true if the underlying iterator is fully exhausted.
   bool IsFullyExhausted() const {
-    return num_valid_ == 0 && !iter_->HasNext();
+    return rows_valid_ == 0 && !iter_->HasNext();
   }
 
-  Status PullNextBlock() {
-    CHECK_EQ(num_advanced_, num_valid_)
-      << "should not pull next block until current block is exhausted";
+  // Advance to the next row in the underlying iterator.
+  Status Advance();
 
-    while (iter_->HasNext()) {
-      RETURN_NOT_OK(iter_->NextBlock(&read_block_));
-      num_advanced_ = 0;
-      // Honor the selection vector of the read_block_, since not all rows are necessarily
selected.
-      SelectionVector *selection = read_block_.selection_vector();
-      DCHECK_EQ(selection->nrows(), read_block_.nrows());
-      DCHECK_LE(selection->CountSelected(), read_block_.nrows());
-      num_valid_ = selection->CountSelected();
-      VLOG(2) << selection->CountSelected() << "/" << read_block_.nrows()
<< " rows selected";
-      // Seek next_row_ to the first selected row.
-      for (next_row_idx_ = 0; next_row_idx_ < read_block_.nrows(); next_row_idx_++) {
-        if (selection->IsRowSelected(next_row_idx_)) {
-          next_row_.Reset(&read_block_, next_row_idx_);
-          return Status::OK();
-        }
-      }
-      // The block may have had no selected rows, in which case we need to continue
-      // to the next block.
-    }
-
-    // The underlying iterator is fully exhausted.
-    num_advanced_ = 0;
-    num_valid_ = 0;
-    return Status::OK();
+  // Add statistics about the underlying iterator to the given vector.
+  void AddStats(std::vector<IteratorStats>* stats) const {
+    AddIterStats(*iter_, stats);
   }
 
+  // Returns the number of valid rows remaining in the current block.
   size_t remaining_in_block() const {
-    return num_valid_ - num_advanced_;
+    return rows_valid_ - rows_advanced_;
   }
 
-  const shared_ptr<RowwiseIterator>& iter() const {
-    return iter_;
+ private:
+  // Pull the next block from the underlying iterator.
+  Status PullNextBlock();
+
+  // Returns true if the current block in the underlying iterator is exhausted.
+  bool IsBlockExhausted() const {
+    return rows_advanced_ == rows_valid_;
   }
 
   shared_ptr<RowwiseIterator> iter_;
   Arena arena_;
   RowBlock read_block_;
+
   // The row currently pointed to by the iterator.
   RowBlockRow next_row_;
+
   // Row index of next_row_ in read_block_.
   size_t next_row_idx_;
+
   // Number of rows we've advanced past in the current RowBlock.
-  size_t num_advanced_;
+  size_t rows_advanced_;
+
   // Number of valid (selected) rows in the current RowBlock.
-  size_t num_valid_;
+  size_t rows_valid_;
+
+  DISALLOW_COPY_AND_ASSIGN(MergeIterState);
 };
 
+Status MergeIterState::Advance() {
+  rows_advanced_++;
+  if (IsBlockExhausted()) {
+    arena_.Reset();
+    return PullNextBlock();
+  }
+
+  // Seek to the next selected row.
+  SelectionVector *selection = read_block_.selection_vector();
+  for (++next_row_idx_; next_row_idx_ < read_block_.nrows(); next_row_idx_++) {
+    if (selection->IsRowSelected(next_row_idx_)) {
+      next_row_.Reset(&read_block_, next_row_idx_);
+      break;
+    }
+  }
+  DCHECK_NE(next_row_idx_, read_block_.nrows()+1) << "No selected rows found!";
+  return Status::OK();
+}
+
+Status MergeIterState::PullNextBlock() {
+  CHECK_EQ(rows_advanced_, rows_valid_)
+      << "should not pull next block until current block is exhausted";
+
+  while (iter_->HasNext()) {
+    RETURN_NOT_OK(iter_->NextBlock(&read_block_));
+    rows_advanced_ = 0;
+    // Honor the selection vector of the read_block_, since not all rows are necessarily
selected.
+    SelectionVector *selection = read_block_.selection_vector();
+    DCHECK_EQ(selection->nrows(), read_block_.nrows());
+    DCHECK_LE(selection->CountSelected(), read_block_.nrows());
+    rows_valid_ = selection->CountSelected();
+    VLOG(2) << selection->CountSelected() << "/" << read_block_.nrows()
<< " rows selected";
+    // Seek next_row_ to the first selected row.
+    for (next_row_idx_ = 0; next_row_idx_ < read_block_.nrows(); next_row_idx_++) {
+      if (selection->IsRowSelected(next_row_idx_)) {
+        next_row_.Reset(&read_block_, next_row_idx_);
+        return Status::OK();
+      }
+    }
+    // The block may have had no selected rows, in which case we need to continue
+    // to the next block.
+  }
+
+  // The underlying iterator is fully exhausted.
+  rows_advanced_ = 0;
+  rows_valid_ = 0;
+  return Status::OK();
+}
 
 MergeIterator::MergeIterator(
     const Schema& schema,
@@ -195,12 +220,12 @@ MergeIterator::~MergeIterator() {}
 
 Status MergeIterator::Init(ScanSpec *spec) {
   CHECK(!initted_);
-  // TODO: check that schemas match up!
+  // TODO(todd): check that schemas match up!
 
   RETURN_NOT_OK(InitSubIterators(spec));
 
   for (unique_ptr<MergeIterState> &state : iters_) {
-    RETURN_NOT_OK(state->PullNextBlock());
+    RETURN_NOT_OK(state->Init());
   }
 
   // Before we copy any rows, clean up any iterators which were empty
@@ -263,7 +288,7 @@ void MergeIterator::PrepareBatch(RowBlock* dst) {
   dst->Resize(std::min(dst->row_capacity(), available));
 }
 
-// TODO: this is an obvious spot to add codegen - there's a ton of branching
+// TODO(todd): this is an obvious spot to add codegen - there's a ton of branching
 // and such around the comparisons. A simple experiment indicated there's some
 // 2x to be gained.
 Status MergeIterator::MaterializeBlock(RowBlock *dst) {
@@ -298,7 +323,7 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
 
     if (smallest->IsFullyExhausted()) {
       std::lock_guard<rw_spinlock> l(iters_lock_);
-      AddIterStats(*smallest->iter(), &finished_iter_stats_by_col_);
+      smallest->AddStats(&finished_iter_stats_by_col_);
       iters_.erase(iters_.begin() + smallest_idx);
     }
   }
@@ -321,7 +346,7 @@ void MergeIterator::GetIteratorStats(vector<IteratorStats>* stats)
const {
   *stats = finished_iter_stats_by_col_;
 
   for (const auto& iter_state : iters_) {
-    AddIterStats(*iter_state->iter(), stats);
+    iter_state->AddStats(stats);
   }
 }
 


Mime
View raw message