impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jruss...@apache.org
Subject [1/2] incubator-impala git commit: IMPALA-5618: buffered-tuple-stream-v2 fixes
Date Fri, 07 Jul 2017 19:04:32 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master bc1feb34d -> 07d3cea1f


IMPALA-5618: buffered-tuple-stream-v2 fixes

This fixes two issues:
* AddRowCustom() caused a performance regression when the function
  was heap-allocated. This is solved by splitting the API into two
  separate calls. This imposes an additional burden on the caller
  but it is easier to reason about its performance.
* Allow re-reading streams with 'delete_on_read_' set so long as no rows
  were read from the stream. This is necessary for some spilling ExecNodes
  that prepare the stream for reading in order to acquire the buffer,
  but then need to spill the stream to free memory before they actually
  are able to read the stream.

Change-Id: Ibab0d774f66be632f17376a56abf302821cca047
Reviewed-on: http://gerrit.cloudera.org:8080/7358
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/081ecf01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/081ecf01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/081ecf01

Branch: refs/heads/master
Commit: 081ecf01526449c2360d2d702afc1488b57e07fb
Parents: bc1feb3
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Wed Jul 5 17:55:58 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Fri Jul 7 08:15:59 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-tuple-stream-v2-test.cc | 18 +++--
 be/src/runtime/buffered-tuple-stream-v2.cc      | 51 +++++++-------
 be/src/runtime/buffered-tuple-stream-v2.h       | 72 +++++++++++---------
 .../runtime/buffered-tuple-stream-v2.inline.h   | 12 ++--
 4 files changed, 81 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/081ecf01/be/src/runtime/buffered-tuple-stream-v2-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2-test.cc b/be/src/runtime/buffered-tuple-stream-v2-test.cc
index 277a564..7e4cef8 100644
--- a/be/src/runtime/buffered-tuple-stream-v2-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2-test.cc
@@ -794,12 +794,11 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
     for (int j = 0; j < batch->num_rows(); ++j) {
       int fixed_size = tuple_desc.byte_size();
       // Copy fixed portion in, but leave it pointing to row batch's varlen data.
-      ASSERT_TRUE(stream.AddRowCustom(fixed_size,
-          [batch, fixed_size, j](uint8_t* tuple_data) {
-            memcpy(tuple_data, batch->GetRow(j)->GetTuple(0), fixed_size);
-          },
-          &status));
+      uint8_t* tuple_data = stream.AddRowCustomBegin(fixed_size, &status);
+      ASSERT_TRUE(tuple_data != nullptr);
       ASSERT_TRUE(status.ok());
+      memcpy(tuple_data, batch->GetRow(j)->GetTuple(0), fixed_size);
+      stream.AddRowCustomEnd(fixed_size);
     }
     rows_added += batch->num_rows();
   }
@@ -1125,12 +1124,11 @@ TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) {
         fixed_size += tuple_desc->byte_size();
         varlen_size += row->GetTuple(k)->VarlenByteSize(*tuple_desc);
       }
-      ASSERT_TRUE(stream.AddRowCustom(fixed_size + varlen_size,
-          [this, row, fixed_size, varlen_size](uint8_t* data) {
-            WriteStringRow(string_desc_, row, fixed_size, varlen_size, data);
-          },
-          &status));
+      uint8_t* data = stream.AddRowCustomBegin(fixed_size + varlen_size, &status);
+      ASSERT_TRUE(data != nullptr);
       ASSERT_TRUE(status.ok());
+      WriteStringRow(string_desc_, row, fixed_size, varlen_size, data);
+      stream.AddRowCustomEnd(fixed_size + varlen_size);
     }
     rows_added += batch->num_rows();
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/081ecf01/be/src/runtime/buffered-tuple-stream-v2.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc b/be/src/runtime/buffered-tuple-stream-v2.cc
index 82da2bc..90d9c12 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2.cc
@@ -540,6 +540,9 @@ void BufferedTupleStreamV2::InvalidateReadIterator() {
   if (read_page_reservation_.GetReservation() > 0) {
     buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
   }
+  // It is safe to re-read a delete-on-read stream if no rows were read and no pages
+  // were therefore deleted.
+  if (rows_returned_ == 0) delete_on_read_ = false;
 }
 
 Status BufferedTupleStreamV2::PrepareForRead(bool delete_on_read, bool* got_reservation)
{
@@ -863,39 +866,41 @@ int64_t BufferedTupleStreamV2::ComputeRowSize(TupleRow* row) const noexcept
{
 }
 
 bool BufferedTupleStreamV2::AddRowSlow(TupleRow* row, Status* status) noexcept {
-  // Use AddRowCustomSlow() to do the work of advancing the page.
+  // Use AddRowCustom*() to do the work of advancing the page.
   int64_t row_size = ComputeRowSize(row);
-  return AddRowCustomSlow(row_size,
-      [this, row, row_size](uint8_t* data) {
-        bool success = DeepCopy(row, &data, data + row_size);
-        DCHECK(success);
-        DCHECK_EQ(data, write_ptr_);
-      },
-      status);
+  uint8_t* data = AddRowCustomBeginSlow(row_size, status);
+  if (data == nullptr) return false;
+  bool success = DeepCopy(row, &data, data + row_size);
+  DCHECK(success);
+  DCHECK_EQ(data, write_ptr_);
+  AddRowCustomEnd(row_size);
+  return true;
 }
 
-bool BufferedTupleStreamV2::AddRowCustomSlow(
-    int64_t size, const WriteRowFn& write_fn, Status* status) noexcept {
+uint8_t* BufferedTupleStreamV2::AddRowCustomBeginSlow(
+    int64_t size, Status* status) noexcept {
   bool got_reservation;
   *status = AdvanceWritePage(size, &got_reservation);
-  if (!status->ok() || !got_reservation) return false;
+  if (!status->ok() || !got_reservation) return nullptr;
 
   // We have a large-enough page so now success is guaranteed.
-  bool result = AddRowCustom(size, write_fn, status);
-  DCHECK(result);
-  if (size > default_page_len_) {
-    // Immediately unpin the large write page so that we're not using up extra reservation
-    // and so we don't append another row to the page.
-    ResetWritePage();
-    // Save some of the reservation we freed up so we can create the next write page when
-    // needed.
-    if (NeedWriteReservation()) {
-      buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
-    }
+  uint8_t* result = AddRowCustomBegin(size, status);
+  DCHECK(result != nullptr);
+  return result;
+}
+
+void BufferedTupleStreamV2::AddLargeRowCustomEnd(int64_t size) noexcept {
+  DCHECK_GT(size, default_page_len_);
+  // Immediately unpin the large write page so that we're not using up extra reservation
+  // and so we don't append another row to the page.
+  ResetWritePage();
+  // Save some of the reservation we freed up so we can create the next write page when
+  // needed.
+  if (NeedWriteReservation()) {
+    buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
   }
   // The stream should be in a consistent state once the row is added.
   CHECK_CONSISTENCY();
-  return true;
 }
 
 bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) noexcept {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/081ecf01/be/src/runtime/buffered-tuple-stream-v2.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.h b/be/src/runtime/buffered-tuple-stream-v2.h
index c06dc6c..1f21235 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.h
+++ b/be/src/runtime/buffered-tuple-stream-v2.h
@@ -52,15 +52,15 @@ class TupleRow;
 /// PrepareForReadWrite() is called to initialize both read and write iterators to enable
 /// interleaved reads and writes.
 ///
-/// To use write-only mode, PrepareForWrite() is called once and AddRow()/AddRowCustom()
+/// To use write-only mode, PrepareForWrite() is called once and AddRow()/AddRowCustom*()
 /// are called repeatedly to initialize then advance a write iterator through the stream.
 /// Once the stream is fully written, it can be read back by calling PrepareForRead()
 /// then GetNext() repeatedly to advance a read iterator through the stream, or by
 /// calling GetRows() to get all of the rows at once.
 ///
 /// To use read/write mode, PrepareForReadWrite() is called once to initialize the read
-/// and write iterators. AddRow()/AddRowCustom() then advance a write iterator through the
-/// stream, and GetNext() advances a trailing read iterator through the stream.
+/// and write iterators. AddRow()/AddRowCustom*() then advance a write iterator through
+/// the stream, and GetNext() advances a trailing read iterator through the stream.
 ///
 /// Buffer management:
 /// The tuple stream is backed by a sequence of BufferPool Pages. The tuple stream uses
@@ -87,7 +87,7 @@ class TupleRow;
 /// To read or write a row larger than the default page size to/from an unpinned stream,
 /// the client must have max_page_len - default_page_len unused reservation. Writing a
 /// large row to an unpinned stream only uses the reservation for the duration of the
-/// AddRow()/AddRowCustom() call. Reading a large row from an unpinned stream uses the
+/// AddRow()/AddRowCustom*() call. Reading a large row from an unpinned stream uses the
 /// reservation until the next call to GetNext(). E.g. to partition a single unpinned
 /// stream into n unpinned streams, the reservation needed is (n - 1) *
 /// default_page_len + 2 * max_page_len: one large read buffer and one large write
@@ -187,15 +187,16 @@ class TupleRow;
 /// the stream may be freed on the next call to GetNext().
 /// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch.
 ///
-/// Manual construction of rows with AddRowCustom():
-/// The BufferedTupleStream supports allocation of uninitialized rows with AddRowCustom().
-/// AddRowCustom() is called instead of AddRow() if the client wants to manually construct
-/// a row. The caller of AddRowCustom() is responsible for providing a callback function
-/// that writes the row with exactly the layout described above.
+/// Manual construction of rows with AddRowCustomBegin()/AddRowCustomEnd():
+/// The BufferedTupleStream supports allocation of uninitialized rows with
+/// AddRowCustom*(). AddRowCustomBegin() is called instead of AddRow() if the client wants
+/// to manually construct a row. The caller of AddRowCustomBegin() is responsible for
+/// writing the row with exactly the layout described above then calling
+/// AddRowCustomEnd() when done.
 ///
 /// If a caller constructs a tuple in this way, the caller can set the pointers and they
 /// will not be modified until the stream is read via GetNext() or GetRows().
-/// TODO: IMPALA-5007: try to remove AddRowCustom() by unifying with AddRow().
+/// TODO: IMPALA-5007: try to remove AddRowCustom*() by unifying with AddRow().
 ///
 /// TODO: we need to be able to do read ahead for pages. We need some way to indicate a
 /// page will need to be pinned soon.
@@ -223,7 +224,8 @@ class BufferedTupleStreamV2 {
 
   /// Prepares the stream for writing by saving enough reservation for a default-size
   /// write page. Tries to increase reservation if there is not enough unused reservation
-  /// for a page. Called after Init() and before the first AddRow() or AddRowCustom() call.
+  /// for a page. Called after Init() and before the first AddRow() or
+  /// AddRowCustomBegin() call.
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     first write page and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
@@ -231,8 +233,8 @@ class BufferedTupleStreamV2 {
 
   /// Prepares the stream for interleaved reads and writes by saving enough reservation
   /// for default-sized read and write pages. Called after Init() and before the first
-  /// AddRow() or AddRowCustom() call.
-  /// delete_on_read: Pages are deleted after they are read.
+  /// AddRow() or AddRowCustomBegin() call.
+  /// 'delete_on_read': Pages are deleted after they are read.
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     read and write pages and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
@@ -240,11 +242,11 @@ class BufferedTupleStreamV2 {
       bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
 
   /// Prepares the stream for reading, invalidating the write iterator (if there is one).
-  /// Therefore must be called after the last AddRow() or AddRowCustom() and before
+  /// Therefore must be called after the last AddRow() or AddRowCustomEnd() and before
   /// GetNext(). PrepareForRead() can be called multiple times to do multiple read passes
-  /// over the stream, unless PrepareForRead() or PrepareForReadWrite() was previously
-  /// called with delete_on_read = true.
-  /// delete_on_read: Pages are deleted after they are read.
+  /// over the stream, unless rows were read from the stream after PrepareForRead() or
+  /// PrepareForReadWrite() was called with delete_on_read = true.
+  /// 'delete_on_read': Pages are deleted after they are read.
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     first read page and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
@@ -271,21 +273,19 @@ class BufferedTupleStreamV2 {
   /// returns an error, it should not be called again.
   bool AddRow(TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT;
 
-  /// A function that writes a row to 'data' with the format described in the class
-  /// comment.
+  /// Allocates space to store a row of 'size' bytes (including fixed and variable length
+  /// data). If successful, returns a pointer to the allocated row. The caller then must
+  /// writes valid data to the row and call AddRowCustomEnd().
   ///
-  /// Use boost::function instead of std::function because it is better at avoiding heap
-  /// allocations when capturing a small number of variables. In GCC 4.9.2/Boost 1.57,
-  /// boost::function can store up to 3 64-bit pointers without making a heap allocation,
-  /// but std::function always makes a heap allocation.
-  typedef boost::function<void(uint8_t* data)> WriteRowFn;
+  /// If unsuccessful, returns nullptr. The failure modes are the same as described in the
+  /// AddRow() comment.
+  ALWAYS_INLINE uint8_t* AddRowCustomBegin(int64_t size, Status* status);
 
-  /// Allocates space to store a row of 'size' bytes (including fixed and variable length
-  /// data). If successful, calls 'write_fn' with a pointer to the start of the allocated
-  /// space and returns true. Otherwise returns false. The failure modes are the same as
-  /// described in the AddRow() comment.
-  ALWAYS_INLINE bool AddRowCustom(
-      int64_t size, const WriteRowFn& write_fn, Status* status);
+  /// Called after AddRowCustomBegin() when done writing the row. Only should be called
+  /// if AddRowCustomBegin() succeeded. See the AddRowCustomBegin() comment for
+  /// explanation.
+  /// 'size': the size passed into AddRowCustomBegin().
+  void AddRowCustomEnd(int64_t size);
 
   /// Unflattens 'flat_row' into a regular TupleRow 'row'. Only valid to call if the
   /// stream is pinned. The row must have been allocated with the stream's row desc.
@@ -476,7 +476,7 @@ class BufferedTupleStreamV2 {
 
   /// The current page for writing. NULL if there is no write iterator or no current
   /// write page. Always pinned. Size is 'default_page_len_', except temporarily while
-  /// appending a larger row in AddRowCustomSlow().
+  /// appending a larger row between AddRowCustomBegin() and AddRowCustomEnd().
   Page* write_page_;
 
   /// Saved reservation for write iterator. 'default_page_len_' reservation is saved if
@@ -508,7 +508,8 @@ class BufferedTupleStreamV2 {
   /// Whether any tuple in the rows is nullable.
   const bool has_nullable_tuple_;
 
-  /// If true, pages are deleted after they are read.
+  /// If true, pages are deleted after they are read during this read pass. Once rows
+  /// have been read from a stream with 'delete_on_read_' true, this is always true.
   bool delete_on_read_;
 
   bool closed_; // Used for debugging.
@@ -532,9 +533,12 @@ class BufferedTupleStreamV2 {
   /// the current page.
   bool AddRowSlow(TupleRow* row, Status* status) noexcept;
 
-  /// The slow path for AddRowCustom() that is called if there is not sufficient space in
+  /// The slow path for AddRowCustomBegin() that is called if there is not sufficient space
in
   /// the current page.
-  bool AddRowCustomSlow(int64_t size, const WriteRowFn& write_fn, Status* status) noexcept;
+  uint8_t* AddRowCustomBeginSlow(int64_t size, Status* status) noexcept;
+
+  /// The slow path for AddRowCustomEnd() that is called for large pages.
+  void AddLargeRowCustomEnd(int64_t size) noexcept;
 
   /// Copies 'row' into the buffer starting at *data and ending at the byte before
   /// 'data_end'. On success, returns true and updates *data to point after the last

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/081ecf01/be/src/runtime/buffered-tuple-stream-v2.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.inline.h b/be/src/runtime/buffered-tuple-stream-v2.inline.h
index a3b219c..7022249 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.inline.h
+++ b/be/src/runtime/buffered-tuple-stream-v2.inline.h
@@ -31,12 +31,11 @@ inline int BufferedTupleStreamV2::NullIndicatorBytesPerRow() const {
   return BitUtil::RoundUpNumBytes(fixed_tuple_sizes_.size());
 }
 
-inline bool BufferedTupleStreamV2::AddRowCustom(
-    int64_t size, const WriteRowFn& write_fn, Status* status) {
+inline uint8_t* BufferedTupleStreamV2::AddRowCustomBegin(int64_t size, Status* status) {
   DCHECK(!closed_);
   DCHECK(has_write_iterator());
   if (UNLIKELY(write_page_ == nullptr || write_ptr_ + size > write_end_ptr_)) {
-    return AddRowCustomSlow(size, write_fn, status);
+    return AddRowCustomBeginSlow(size, status);
   }
   DCHECK(write_page_ != nullptr);
   DCHECK(write_page_->is_pinned());
@@ -46,8 +45,11 @@ inline bool BufferedTupleStreamV2::AddRowCustom(
 
   uint8_t* data = write_ptr_;
   write_ptr_ += size;
-  write_fn(data);
-  return true;
+  return data;
+}
+
+inline void BufferedTupleStreamV2::AddRowCustomEnd(int64_t size) {
+  if (UNLIKELY(size > default_page_len_)) AddLargeRowCustomEnd(size);
 }
 }
 


Mime
View raw message