impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [2/3] incubator-impala git commit: IMPALA-4674: Part 1: port BufferedTupleStream to BufferPool
Date Thu, 16 Mar 2017 22:52:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/buffered-tuple-stream-v2.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc b/be/src/runtime/buffered-tuple-stream-v2.cc
new file mode 100644
index 0000000..083b59e
--- /dev/null
+++ b/be/src/runtime/buffered-tuple-stream-v2.cc
@@ -0,0 +1,812 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/buffered-tuple-stream-v2.inline.h"
+
+#include <boost/bind.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/collection-value.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/string-value.h"
+#include "runtime/tuple-row.h"
+#include "util/bit-util.h"
+#include "util/debug-util.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+#ifdef NDEBUG
+#define CHECK_CONSISTENCY()
+#else
+#define CHECK_CONSISTENCY() CheckConsistency()
+#endif
+
+using namespace impala;
+using namespace strings;
+
+BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state,
+    const RowDescriptor& row_desc, BufferPool::ClientHandle* buffer_pool_client,
+    int64_t page_len, const set<SlotId>& ext_varlen_slots)
+  : state_(state),
+    desc_(row_desc),
+    buffer_pool_(state->exec_env()->buffer_pool()),
+    buffer_pool_client_(buffer_pool_client),
+    total_byte_size_(0),
+    read_page_rows_returned_(-1),
+    read_ptr_(nullptr),
+    write_ptr_(nullptr),
+    write_end_ptr_(nullptr),
+    rows_returned_(0),
+    write_page_(nullptr),
+    bytes_pinned_(0),
+    num_rows_(0),
+    page_len_(page_len),
+    has_nullable_tuple_(row_desc.IsAnyTupleNullable()),
+    delete_on_read_(false),
+    closed_(false),
+    pinned_(true) {
+  read_page_ = pages_.end();
+  fixed_tuple_row_size_ = 0;
+  for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) {
+    const TupleDescriptor* tuple_desc = desc_.tuple_descriptors()[i];
+    const int tuple_byte_size = tuple_desc->byte_size();
+    fixed_tuple_sizes_.push_back(tuple_byte_size);
+    fixed_tuple_row_size_ += tuple_byte_size;
+
+    vector<SlotDescriptor*> tuple_string_slots;
+    vector<SlotDescriptor*> tuple_coll_slots;
+    for (int j = 0; j < tuple_desc->slots().size(); ++j) {
+      SlotDescriptor* slot = tuple_desc->slots()[j];
+      if (!slot->type().IsVarLenType()) continue;
+      if (ext_varlen_slots.find(slot->id()) == ext_varlen_slots.end()) {
+        if (slot->type().IsVarLenStringType()) {
+          tuple_string_slots.push_back(slot);
+        } else {
+          DCHECK(slot->type().IsCollectionType());
+          tuple_coll_slots.push_back(slot);
+        }
+      }
+    }
+    if (!tuple_string_slots.empty()) {
+      inlined_string_slots_.push_back(make_pair(i, tuple_string_slots));
+    }
+
+    if (!tuple_coll_slots.empty()) {
+      inlined_coll_slots_.push_back(make_pair(i, tuple_coll_slots));
+    }
+  }
+  if (has_nullable_tuple_) fixed_tuple_row_size_ += NullIndicatorBytesPerRow();
+}
+
+BufferedTupleStreamV2::~BufferedTupleStreamV2() {
+  DCHECK(closed_);
+}
+
+void BufferedTupleStreamV2::CheckConsistency() const {
+  DCHECK_EQ(bytes_pinned_, CalcBytesPinned()) << DebugString();
+  for (const Page& page : pages_) {
+    DCHECK_EQ(ExpectedPinCount(pinned_, &page), page.pin_count()) << DebugString();
+  }
+  if (has_write_iterator()) {
+    DCHECK(write_page_->is_pinned());
+    DCHECK_GE(write_ptr_, write_page_->data());
+    DCHECK_EQ(write_end_ptr_, write_page_->data() + write_page_->len());
+    DCHECK_GE(write_end_ptr_, write_ptr_);
+  }
+  if (has_read_iterator()) {
+    DCHECK(read_page_->is_pinned());
+    uint8_t* read_end_ptr = read_page_->data() + read_page_->len();
+    DCHECK_GE(read_ptr_, read_page_->data());
+    DCHECK_GE(read_end_ptr, read_ptr_);
+  }
+}
+
+string BufferedTupleStreamV2::DebugString() const {
+  stringstream ss;
+  ss << "BufferedTupleStreamV2 num_rows=" << num_rows_
+     << " rows_returned=" << rows_returned_ << " pinned=" << pinned_
+     << " delete_on_read=" << delete_on_read_ << " closed=" << closed_
+     << " bytes_pinned=" << bytes_pinned_ << " write_page=" << write_page_
+     << " read_page=";
+  if (!has_read_iterator()) {
+    ss << "<end>";
+  } else {
+    ss << &*read_page_;
+  }
+  ss << " pages=[\n";
+  for (const Page& page : pages_) {
+    ss << "{" << page.DebugString() << "}";
+    if (&page != &pages_.back()) ss << ",\n";
+  }
+  ss << "]";
+  return ss.str();
+}
+
+string BufferedTupleStreamV2::Page::DebugString() const {
+  return Substitute("$0 num_rows=$1", handle.DebugString(), num_rows);
+}
+
+Status BufferedTupleStreamV2::Init(int node_id, bool pinned) {
+  if (!pinned) UnpinStream(UNPIN_ALL_EXCEPT_CURRENT);
+  return Status::OK();
+}
+
+Status BufferedTupleStreamV2::PrepareForWrite(bool* got_reservation) {
+  // This must be the first iterator created.
+  DCHECK(pages_.empty());
+  DCHECK(!delete_on_read_);
+  DCHECK(!has_write_iterator());
+  DCHECK(!has_read_iterator());
+  CHECK_CONSISTENCY();
+
+  RETURN_IF_ERROR(CheckPageSizeForRow(fixed_tuple_row_size_));
+  *got_reservation = buffer_pool_client_->IncreaseReservationToFit(page_len_);
+  if (!*got_reservation) return Status::OK();
+  RETURN_IF_ERROR(NewWritePage());
+  CHECK_CONSISTENCY();
+  return Status::OK();
+}
+
+Status BufferedTupleStreamV2::PrepareForReadWrite(
+    bool delete_on_read, bool* got_reservation) {
+  // This must be the first iterator created.
+  DCHECK(pages_.empty());
+  DCHECK(!delete_on_read_);
+  DCHECK(!has_write_iterator());
+  DCHECK(!has_read_iterator());
+  CHECK_CONSISTENCY();
+
+  RETURN_IF_ERROR(CheckPageSizeForRow(fixed_tuple_row_size_));
+  *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * page_len_);
+  if (!*got_reservation) return Status::OK();
+  RETURN_IF_ERROR(NewWritePage());
+  RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read));
+  return Status::OK();
+}
+
+void BufferedTupleStreamV2::Close(RowBatch* batch, RowBatch::FlushMode flush) {
+  for (Page& page : pages_) {
+    if (batch != nullptr && page.is_pinned()) {
+      BufferPool::BufferHandle buffer;
+      buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer);
+      batch->AddBuffer(buffer_pool_client_, move(buffer), flush);
+    } else {
+      buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle);
+    }
+  }
+  pages_.clear();
+  bytes_pinned_ = 0;
+  closed_ = true;
+}
+
+int64_t BufferedTupleStreamV2::CalcBytesPinned() const {
+  int64_t result = 0;
+  for (const Page& page : pages_) result += page.pin_count() * page.len();
+  return result;
+}
+
+Status BufferedTupleStreamV2::PinPage(Page* page) {
+  RETURN_IF_ERROR(buffer_pool_->Pin(buffer_pool_client_, &page->handle));
+  bytes_pinned_ += page->len();
+  return Status::OK();
+}
+
+int BufferedTupleStreamV2::ExpectedPinCount(bool stream_pinned, const Page* page) const {
+  int pin_count = 0;
+  if (stream_pinned && has_write_iterator() && has_read_iterator()) {
+    // The stream is pinned, so all pages have a pin for that (and this pin will be used
+    // as the read iterator when the stream is unpinned)
+    pin_count++;
+    // The write iterator gets it's own pin so that we can unpin the stream without
+    // needing additional reservation.
+    if (is_write_page(page)) pin_count++;
+  } else if (stream_pinned) {
+    // The stream is pinned and only has one iterator. When it's unpinned, either the read
+    // or write iterator can use this pin count.
+    pin_count++;
+  } else {
+    // The stream is unpinned. Each iterator gets a pin count.
+    if (is_read_page(page)) pin_count++;
+    if (is_write_page(page)) pin_count++;
+  }
+  return pin_count;
+}
+
+Status BufferedTupleStreamV2::PinPageIfNeeded(Page* page, bool stream_pinned) {
+  int new_pin_count = ExpectedPinCount(stream_pinned, page);
+  if (new_pin_count != page->pin_count()) {
+    DCHECK_EQ(new_pin_count, page->pin_count() + 1);
+    RETURN_IF_ERROR(PinPage(page));
+  }
+  return Status::OK();
+}
+
+void BufferedTupleStreamV2::UnpinPageIfNeeded(Page* page, bool stream_pinned) {
+  int new_pin_count = ExpectedPinCount(stream_pinned, page);
+  if (new_pin_count != page->pin_count()) {
+    DCHECK_EQ(new_pin_count, page->pin_count() - 1);
+    buffer_pool_->Unpin(buffer_pool_client_, &page->handle);
+    bytes_pinned_ -= page->len();
+  }
+}
+
+Status BufferedTupleStreamV2::NewWritePage() noexcept {
+  DCHECK(!closed_);
+  DCHECK(!has_write_iterator());
+
+  Page new_page;
+  RETURN_IF_ERROR(
+      buffer_pool_->CreatePage(buffer_pool_client_, page_len_, &new_page.handle));
+  bytes_pinned_ += page_len_;
+  total_byte_size_ += page_len_;
+
+  pages_.push_back(std::move(new_page));
+  write_page_ = &pages_.back();
+  DCHECK_EQ(write_page_->num_rows, 0);
+  write_ptr_ = write_page_->data();
+  write_end_ptr_ = write_page_->data() + page_len_;
+  return Status::OK();
+}
+
+Status BufferedTupleStreamV2::CheckPageSizeForRow(int64_t row_size) {
+  // TODO: IMPALA-3208: need to rework this logic to support large pages - should pick
+  // next power-of-two size.
+  if (UNLIKELY(row_size > page_len_)) {
+    // TODO: IMPALA-3208: change the message to reference the query option controlling
+    // max row size.
+    return Status(TErrorCode::BTS_BLOCK_OVERFLOW,
+        PrettyPrinter::Print(row_size, TUnit::BYTES),
+        PrettyPrinter::Print(0, TUnit::BYTES));
+  }
+  return Status::OK();
+}
+
+Status BufferedTupleStreamV2::AdvanceWritePage(
+    int64_t row_size, bool* got_reservation) noexcept {
+  CHECK_CONSISTENCY();
+
+  // Get ready to move to the next write page by unsetting 'write_page_' and
+  // potentially (depending on the mode of this stream) freeing up reservation for the
+  // next write page.
+  ResetWritePage();
+
+  RETURN_IF_ERROR(CheckPageSizeForRow(row_size));
+  // May need to pin the new page for both reading and writing. See ExpectedPinCount();
+  bool pin_for_read = has_read_iterator() && pinned_;
+  int64_t new_page_reservation = pin_for_read ? 2 * page_len_ : page_len_;
+  if (!buffer_pool_client_->IncreaseReservationToFit(new_page_reservation)) {
+    *got_reservation = false;
+    return Status::OK();
+  }
+  RETURN_IF_ERROR(NewWritePage());
+  // We may need to pin the page for reading also.
+  if (pin_for_read) RETURN_IF_ERROR(PinPage(write_page_));
+
+  CHECK_CONSISTENCY();
+  *got_reservation = true;
+  return Status::OK();
+}
+
+void BufferedTupleStreamV2::ResetWritePage() {
+  if (!has_write_iterator()) return;
+  // Unpin the write page if we're reading in unpinned mode.
+  Page* prev_write_page = write_page_;
+  write_page_ = nullptr;
+
+  // May need to decrement pin count now that it's not the write page, depending on
+  // the stream's mode.
+  UnpinPageIfNeeded(prev_write_page, pinned_);
+}
+
+Status BufferedTupleStreamV2::NextReadPage() {
+  DCHECK(!closed_);
+  CHECK_CONSISTENCY();
+
+  if (delete_on_read_) {
+    DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " " << DebugString();
+    DCHECK_NE(&*read_page_, write_page_);
+    bytes_pinned_ -= pages_.front().len();
+    buffer_pool_->DestroyPage(buffer_pool_client_, &pages_.front().handle);
+    pages_.pop_front();
+    read_page_ = pages_.begin();
+  } else {
+    // Unpin pages after reading them if needed.
+    Page* prev_read_page = &*read_page_;
+    ++read_page_;
+    UnpinPageIfNeeded(prev_read_page, pinned_);
+  }
+
+  if (!has_read_iterator()) {
+    CHECK_CONSISTENCY();
+    return Status::OK();
+  }
+
+  // Ensure the next page is pinned for reading. If the stream is unpinned, we freed up
+  // enough reservation by deleting or unpinning the previous page.
+  // TODO: IMPALA-3208: this page may be larger than the previous, so this could
+  // actually fail once we have variable-length pages.
+  RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
+
+  read_page_rows_returned_ = 0;
+  read_ptr_ = read_page_->data();
+
+  CHECK_CONSISTENCY();
+  return Status::OK();
+}
+
+void BufferedTupleStreamV2::ResetReadPage() {
+  if (!has_read_iterator()) return;
+  // Unpin the write page if we're reading in unpinned mode.
+  Page* prev_read_page = &*read_page_;
+  read_page_ = pages_.end();
+
+  // May need to decrement pin count after destroying read iterator.
+  UnpinPageIfNeeded(prev_read_page, pinned_);
+}
+
+Status BufferedTupleStreamV2::PrepareForRead(bool delete_on_read, bool* got_reservation) {
+  CHECK_CONSISTENCY();
+  ResetWritePage();
+  ResetReadPage();
+  // If already pinned, no additional pin is needed (see ExpectedPinCount()).
+  *got_reservation = pinned_ || buffer_pool_client_->IncreaseReservationToFit(page_len_);
+  if (!*got_reservation) return Status::OK();
+  return PrepareForReadInternal(delete_on_read);
+}
+
+Status BufferedTupleStreamV2::PrepareForReadInternal(bool delete_on_read) {
+  DCHECK(!closed_);
+  DCHECK(!delete_on_read_);
+  DCHECK(!pages_.empty());
+  DCHECK(!has_read_iterator());
+
+  // Check if we need to increment the pin count of the read page.
+  read_page_ = pages_.begin();
+  RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
+
+  DCHECK(has_read_iterator());
+  DCHECK(read_page_->is_pinned());
+  read_page_rows_returned_ = 0;
+  read_ptr_ = read_page_->data();
+  rows_returned_ = 0;
+  delete_on_read_ = delete_on_read;
+  CHECK_CONSISTENCY();
+  return Status::OK();
+}
+
+Status BufferedTupleStreamV2::PinStream(bool* pinned) {
+  DCHECK(!closed_);
+  CHECK_CONSISTENCY();
+  if (pinned_) {
+    *pinned = true;
+    return Status::OK();
+  }
+  *pinned = false;
+  // First, make sure we have the reservation to pin all the pages for reading.
+  int64_t bytes_to_pin = 0;
+  for (Page& page : pages_) {
+    bytes_to_pin += (ExpectedPinCount(true, &page) - page.pin_count()) * page.len();
+  }
+  bool reservation_granted = buffer_pool_client_->IncreaseReservationToFit(bytes_to_pin);
+  if (!reservation_granted) return Status::OK();
+
+  // At this point success is guaranteed - go through to pin the pages we need to pin.
+  for (Page& page : pages_) RETURN_IF_ERROR(PinPageIfNeeded(&page, true));
+
+  pinned_ = true;
+  *pinned = true;
+  CHECK_CONSISTENCY();
+  return Status::OK();
+}
+
+void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) {
+  DCHECK(!closed_);
+  if (mode == UNPIN_ALL) {
+    // Invalidate the iterators so they don't keep pages pinned.
+    ResetWritePage();
+    ResetReadPage();
+  }
+
+  if (pinned_) {
+    // If the stream was pinned, there may be some remaining pinned pages that should
+    // be unpinned at this point.
+    for (Page& page : pages_) UnpinPageIfNeeded(&page, false);
+    pinned_ = false;
+  }
+  CHECK_CONSISTENCY();
+}
+
+Status BufferedTupleStreamV2::GetRows(
+    MemTracker* tracker, scoped_ptr<RowBatch>* batch, bool* got_rows) {
+  if (num_rows() > numeric_limits<int>::max()) {
+    // RowBatch::num_rows_ is a 32-bit int, avoid an overflow.
+    return Status(Substitute("Trying to read $0 rows into in-memory batch failed. Limit "
+                             "is $1",
+        num_rows(), numeric_limits<int>::max()));
+  }
+  RETURN_IF_ERROR(PinStream(got_rows));
+  if (!*got_rows) return Status::OK();
+  bool got_reservation;
+  RETURN_IF_ERROR(PrepareForRead(false, &got_reservation));
+  DCHECK(got_reservation) << "Stream was pinned";
+  batch->reset(new RowBatch(desc_, num_rows(), tracker));
+  bool eos = false;
+  // Loop until GetNext fills the entire batch. Each call can stop at page
+  // boundaries. We generally want it to stop, so that pages can be freed
+  // as we read. It is safe in this case because we pin the entire stream.
+  while (!eos) {
+    RETURN_IF_ERROR(GetNext(batch->get(), &eos));
+  }
+  return Status::OK();
+}
+
+Status BufferedTupleStreamV2::GetNext(RowBatch* batch, bool* eos) {
+  return GetNextInternal<false>(batch, eos, nullptr);
+}
+
+Status BufferedTupleStreamV2::GetNext(
+    RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
+  return GetNextInternal<true>(batch, eos, flat_rows);
+}
+
+template <bool FILL_FLAT_ROWS>
+Status BufferedTupleStreamV2::GetNextInternal(
+    RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
+  if (has_nullable_tuple_) {
+    return GetNextInternal<FILL_FLAT_ROWS, true>(batch, eos, flat_rows);
+  } else {
+    return GetNextInternal<FILL_FLAT_ROWS, false>(batch, eos, flat_rows);
+  }
+}
+
+template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE>
+Status BufferedTupleStreamV2::GetNextInternal(
+    RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
+  DCHECK(!closed_);
+  DCHECK(batch->row_desc().Equals(desc_));
+  DCHECK(is_pinned() || !FILL_FLAT_ROWS)
+      << "FlatRowPtrs are only valid for pinned streams";
+  *eos = (rows_returned_ == num_rows_);
+  if (*eos) return Status::OK();
+
+  if (UNLIKELY(read_page_rows_returned_ == read_page_->num_rows)) {
+    // Get the next page in the stream. We need to do this at the beginning of the
+    // GetNext() call to ensure the buffer management semantics. NextReadPage() may
+    // unpin or delete the buffer backing the rows returned from the *previous* call
+    // to GetNext().
+    RETURN_IF_ERROR(NextReadPage());
+  }
+
+  DCHECK(has_read_iterator());
+  DCHECK(read_page_->is_pinned()) << DebugString();
+  DCHECK_GE(read_page_rows_returned_, 0);
+
+  int rows_left_in_page = read_page_->num_rows - read_page_rows_returned_;
+  int rows_to_fill = std::min(batch->capacity() - batch->num_rows(), rows_left_in_page);
+  DCHECK_GE(rows_to_fill, 1);
+  uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(batch->GetRow(batch->num_rows()));
+
+  // Produce tuple rows from the current page and the corresponding position on the
+  // null tuple indicator.
+  if (FILL_FLAT_ROWS) {
+    DCHECK(flat_rows != nullptr);
+    DCHECK(!delete_on_read_);
+    DCHECK_EQ(batch->num_rows(), 0);
+    flat_rows->clear();
+    flat_rows->reserve(rows_to_fill);
+  }
+
+  const uint64_t tuples_per_row = desc_.tuple_descriptors().size();
+  // Start reading from the current position in 'read_page_'.
+  for (int i = 0; i < rows_to_fill; ++i) {
+    if (FILL_FLAT_ROWS) {
+      flat_rows->push_back(read_ptr_);
+      DCHECK_EQ(flat_rows->size(), i + 1);
+    }
+    // Copy the row into the output batch.
+    TupleRow* output_row = reinterpret_cast<TupleRow*>(tuple_row_mem);
+    tuple_row_mem += sizeof(Tuple*) * tuples_per_row;
+    UnflattenTupleRow<HAS_NULLABLE_TUPLE>(&read_ptr_, output_row);
+
+    // Update string slot ptrs, skipping external strings.
+    for (int j = 0; j < inlined_string_slots_.size(); ++j) {
+      Tuple* tuple = output_row->GetTuple(inlined_string_slots_[j].first);
+      if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
+      FixUpStringsForRead(inlined_string_slots_[j].second, tuple);
+    }
+
+    // Update collection slot ptrs, skipping external collections. We traverse the
+    // collection structure in the same order as it was written to the stream, allowing
+    // us to infer the data layout based on the length of collections and strings.
+    for (int j = 0; j < inlined_coll_slots_.size(); ++j) {
+      Tuple* tuple = output_row->GetTuple(inlined_coll_slots_[j].first);
+      if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
+      FixUpCollectionsForRead(inlined_coll_slots_[j].second, tuple);
+    }
+  }
+
+  batch->CommitRows(rows_to_fill);
+  rows_returned_ += rows_to_fill;
+  read_page_rows_returned_ += rows_to_fill;
+  *eos = (rows_returned_ == num_rows_);
+  if (read_page_rows_returned_ == read_page_->num_rows && (!pinned_ || delete_on_read_)) {
+    // No more data in this page. The batch must be immediately returned up the operator
+    // tree and deep copied so that NextReadPage() can reuse the read page's buffer.
+    // TODO: IMPALA-4179 - instead attach the buffer and flush the resources.
+    batch->MarkNeedsDeepCopy();
+  }
+  if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill);
+  DCHECK_LE(read_ptr_, read_page_->data() + read_page_->len());
+  return Status::OK();
+}
+
+void BufferedTupleStreamV2::FixUpStringsForRead(
+    const vector<SlotDescriptor*>& string_slots, Tuple* tuple) {
+  DCHECK(tuple != nullptr);
+  for (const SlotDescriptor* slot_desc : string_slots) {
+    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
+
+    StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
+    DCHECK_LE(read_ptr_ + sv->len, read_page_->data() + read_page_->len());
+    sv->ptr = reinterpret_cast<char*>(read_ptr_);
+    read_ptr_ += sv->len;
+  }
+}
+
+void BufferedTupleStreamV2::FixUpCollectionsForRead(
+    const vector<SlotDescriptor*>& collection_slots, Tuple* tuple) {
+  DCHECK(tuple != nullptr);
+  for (const SlotDescriptor* slot_desc : collection_slots) {
+    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
+
+    CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
+    const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
+    int coll_byte_size = cv->num_tuples * item_desc.byte_size();
+    DCHECK_LE(read_ptr_ + coll_byte_size, read_page_->data() + read_page_->len());
+    cv->ptr = reinterpret_cast<uint8_t*>(read_ptr_);
+    read_ptr_ += coll_byte_size;
+
+    if (!item_desc.HasVarlenSlots()) continue;
+    uint8_t* coll_data = cv->ptr;
+    for (int i = 0; i < cv->num_tuples; ++i) {
+      Tuple* item = reinterpret_cast<Tuple*>(coll_data);
+      FixUpStringsForRead(item_desc.string_slots(), item);
+      FixUpCollectionsForRead(item_desc.collection_slots(), item);
+      coll_data += item_desc.byte_size();
+    }
+  }
+}
+
+int64_t BufferedTupleStreamV2::ComputeRowSize(TupleRow* row) const noexcept {
+  int64_t size = 0;
+  if (has_nullable_tuple_) {
+    size += NullIndicatorBytesPerRow();
+    for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) {
+      if (row->GetTuple(i) != nullptr) size += fixed_tuple_sizes_[i];
+    }
+  } else {
+    size = fixed_tuple_row_size_;
+  }
+  for (int i = 0; i < inlined_string_slots_.size(); ++i) {
+    Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first);
+    if (tuple == nullptr) continue;
+    const vector<SlotDescriptor*>& slots = inlined_string_slots_[i].second;
+    for (auto it = slots.begin(); it != slots.end(); ++it) {
+      if (tuple->IsNull((*it)->null_indicator_offset())) continue;
+      size += tuple->GetStringSlot((*it)->tuple_offset())->len;
+    }
+  }
+
+  for (int i = 0; i < inlined_coll_slots_.size(); ++i) {
+    Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first);
+    if (tuple == nullptr) continue;
+    const vector<SlotDescriptor*>& slots = inlined_coll_slots_[i].second;
+    for (auto it = slots.begin(); it != slots.end(); ++it) {
+      if (tuple->IsNull((*it)->null_indicator_offset())) continue;
+      CollectionValue* cv = tuple->GetCollectionSlot((*it)->tuple_offset());
+      const TupleDescriptor& item_desc = *(*it)->collection_item_descriptor();
+      size += cv->num_tuples * item_desc.byte_size();
+
+      if (!item_desc.HasVarlenSlots()) continue;
+      for (int j = 0; j < cv->num_tuples; ++j) {
+        Tuple* item = reinterpret_cast<Tuple*>(&cv->ptr[j * item_desc.byte_size()]);
+        size += item->VarlenByteSize(item_desc);
+      }
+    }
+  }
+  return size;
+}
+
+bool BufferedTupleStreamV2::AddRowSlow(TupleRow* row, Status* status) noexcept {
+  bool got_reservation;
+  *status = AdvanceWritePage(ComputeRowSize(row), &got_reservation);
+  if (!status->ok() || !got_reservation) return false;
+  return DeepCopy(row);
+}
+
+uint8_t* BufferedTupleStreamV2::AllocateRowSlow(
+    int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) noexcept {
+  int64_t row_size = static_cast<int64_t>(fixed_size) + varlen_size;
+  bool got_reservation;
+  *status = AdvanceWritePage(row_size, &got_reservation);
+  if (!status->ok() || !got_reservation) return nullptr;
+
+  // We have a large-enough page so now success is guaranteed.
+  uint8_t* result = AllocateRow(fixed_size, varlen_size, varlen_data, status);
+  DCHECK(result != nullptr);
+  return result;
+}
+
+bool BufferedTupleStreamV2::DeepCopy(TupleRow* row) noexcept {
+  if (has_nullable_tuple_) {
+    return DeepCopyInternal<true>(row);
+  } else {
+    return DeepCopyInternal<false>(row);
+  }
+}
+
+// TODO: consider codegening this.
+// TODO: in case of duplicate tuples, this can redundantly serialize data.
+template <bool HAS_NULLABLE_TUPLE>
+bool BufferedTupleStreamV2::DeepCopyInternal(TupleRow* row) noexcept {
+  if (UNLIKELY(write_page_ == nullptr)) return false;
+  DCHECK(write_page_->is_pinned()) << DebugString() << std::endl
+                                   << write_page_->DebugString();
+
+  const uint64_t tuples_per_row = desc_.tuple_descriptors().size();
+  uint32_t bytes_remaining = write_end_ptr_ - write_ptr_;
+
+  // Move to the next page we may not have enough space to append the fixed-length part
+  // of the row.
+  if (UNLIKELY((bytes_remaining < fixed_tuple_row_size_))) return false;
+
+  // Copy the not NULL fixed len tuples. For the NULL tuples just update the NULL tuple
+  // indicator.
+  if (HAS_NULLABLE_TUPLE) {
+    uint8_t* null_indicators = write_ptr_;
+    int null_indicator_bytes = NullIndicatorBytesPerRow();
+    memset(null_indicators, 0, null_indicator_bytes);
+    write_ptr_ += NullIndicatorBytesPerRow();
+    for (int i = 0; i < tuples_per_row; ++i) {
+      uint8_t* null_word = null_indicators + (i >> 3);
+      const uint32_t null_pos = i & 7;
+      const int tuple_size = fixed_tuple_sizes_[i];
+      Tuple* t = row->GetTuple(i);
+      const uint8_t mask = 1 << (7 - null_pos);
+      if (t != nullptr) {
+        memcpy(write_ptr_, t, tuple_size);
+        write_ptr_ += tuple_size;
+      } else {
+        *null_word |= mask;
+      }
+    }
+  } else {
+    // If we know that there are no nullable tuples no need to set the nullability flags.
+    for (int i = 0; i < tuples_per_row; ++i) {
+      const int tuple_size = fixed_tuple_sizes_[i];
+      Tuple* t = row->GetTuple(i);
+      // TODO: Once IMPALA-1306 (Avoid passing empty tuples of non-materialized slots)
+      // is delivered, the check below should become DCHECK(t != nullptr).
+      DCHECK(t != nullptr || tuple_size == 0);
+      memcpy(write_ptr_, t, tuple_size);
+      write_ptr_ += tuple_size;
+    }
+  }
+
+  // Copy inlined string slots. Note: we do not need to convert the string ptrs to offsets
+  // on the write path, only on the read. The tuple data is immediately followed
+  // by the string data so only the len information is necessary.
+  for (int i = 0; i < inlined_string_slots_.size(); ++i) {
+    const Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first);
+    if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
+    if (UNLIKELY(!CopyStrings(tuple, inlined_string_slots_[i].second))) return false;
+  }
+
+  // Copy inlined collection slots. We copy collection data in a well-defined order so
+  // we do not need to convert pointers to offsets on the write path.
+  for (int i = 0; i < inlined_coll_slots_.size(); ++i) {
+    const Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first);
+    if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
+    if (UNLIKELY(!CopyCollections(tuple, inlined_coll_slots_[i].second))) return false;
+  }
+
+  ++num_rows_;
+  ++write_page_->num_rows;
+  return true;
+}
+
+bool BufferedTupleStreamV2::CopyStrings(
+    const Tuple* tuple, const vector<SlotDescriptor*>& string_slots) {
+  for (const SlotDescriptor* slot_desc : string_slots) {
+    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
+    const StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
+    if (LIKELY(sv->len > 0)) {
+      if (UNLIKELY(write_ptr_ + sv->len > write_end_ptr_)) return false;
+
+      memcpy(write_ptr_, sv->ptr, sv->len);
+      write_ptr_ += sv->len;
+    }
+  }
+  return true;
+}
+
+bool BufferedTupleStreamV2::CopyCollections(
+    const Tuple* tuple, const vector<SlotDescriptor*>& collection_slots) {
+  for (const SlotDescriptor* slot_desc : collection_slots) {
+    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
+    const CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
+    const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
+    if (LIKELY(cv->num_tuples > 0)) {
+      int coll_byte_size = cv->num_tuples * item_desc.byte_size();
+      if (UNLIKELY(write_ptr_ + coll_byte_size > write_end_ptr_)) return false;
+      uint8_t* coll_data = write_ptr_;
+      memcpy(coll_data, cv->ptr, coll_byte_size);
+      write_ptr_ += coll_byte_size;
+
+      if (!item_desc.HasVarlenSlots()) continue;
+      // Copy variable length data when present in collection items.
+      for (int i = 0; i < cv->num_tuples; ++i) {
+        const Tuple* item = reinterpret_cast<Tuple*>(coll_data);
+        if (UNLIKELY(!CopyStrings(item, item_desc.string_slots()))) return false;
+        if (UNLIKELY(!CopyCollections(item, item_desc.collection_slots()))) return false;
+        coll_data += item_desc.byte_size();
+      }
+    }
+  }
+  return true;
+}
+
+void BufferedTupleStreamV2::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const {
+  DCHECK(row != nullptr);
+  DCHECK(!closed_);
+  DCHECK(is_pinned());
+  DCHECK(!delete_on_read_);
+  uint8_t* data = flat_row;
+  return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) :
+                               UnflattenTupleRow<false>(&data, row);
+}
+
+template <bool HAS_NULLABLE_TUPLE>
+void BufferedTupleStreamV2::UnflattenTupleRow(uint8_t** data, TupleRow* row) const {
+  const int tuples_per_row = desc_.tuple_descriptors().size();
+  uint8_t* ptr = *data;
+  if (has_nullable_tuple_) {
+    // Stitch together the tuples from the page and the NULL ones.
+    const uint8_t* null_indicators = ptr;
+    ptr += NullIndicatorBytesPerRow();
+    for (int i = 0; i < tuples_per_row; ++i) {
+      const uint8_t* null_word = null_indicators + (i >> 3);
+      const uint32_t null_pos = i & 7;
+      const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
+      row->SetTuple(
+          i, reinterpret_cast<Tuple*>(reinterpret_cast<uint64_t>(ptr) * is_not_null));
+      ptr += fixed_tuple_sizes_[i] * is_not_null;
+    }
+  } else {
+    for (int i = 0; i < tuples_per_row; ++i) {
+      row->SetTuple(i, reinterpret_cast<Tuple*>(ptr));
+      ptr += fixed_tuple_sizes_[i];
+    }
+  }
+  *data = ptr;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/buffered-tuple-stream-v2.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.h b/be/src/runtime/buffered-tuple-stream-v2.h
new file mode 100644
index 0000000..d707604
--- /dev/null
+++ b/be/src/runtime/buffered-tuple-stream-v2.h
@@ -0,0 +1,592 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_H
+#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_H
+
+#include <set>
+#include <vector>
+#include <boost/scoped_ptr.hpp>
+
+#include "common/global-types.h"
+#include "common/status.h"
+#include "gutil/macros.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/row-batch.h"
+
+namespace impala {
+
+class MemTracker;
+class RuntimeState;
+class RowDescriptor;
+class SlotDescriptor;
+class Tuple;
+class TupleRow;
+
+/// Class that provides an abstraction for a stream of tuple rows backed by BufferPool
+/// Pages. Rows can be added to the stream and read back. Rows are returned in the order
+/// they are added.
+///
+/// The BufferedTupleStream is *not* thread safe from the caller's point of view.
+/// Different threads should not concurrently call methods of the same BufferedTupleStream
+/// object.
+///
+/// Reading and writing the stream:
+/// The stream supports two modes of reading/writing, depending on whether
+/// PrepareForWrite() is called to initialize a write iterator only or
+/// PrepareForReadWrite() is called to initialize both read and write iterators to enable
+/// interleaved reads and writes.
+///
+/// To use write-only mode, PrepareForWrite() is called once and AddRow()/AllocateRow()
+/// are called repeatedly to initialize then advance a write iterator through the stream.
+/// Once the stream is fully written, it can be read back by calling PrepareForRead()
+/// then GetNext() repeatedly to advance a read iterator through the stream, or by
+/// calling GetRows() to get all of the rows at once.
+///
+/// To use read/write mode, PrepareForReadWrite() is called once to initialize the read
+/// and write iterators. AddRow()/AllocateRow() then advance a write iterator through the
+/// stream, and GetNext() advances a trailing read iterator through the stream.
+///
+/// Buffer management:
+/// The tuple stream is backed by a sequence of BufferPool Pages. The tuple stream uses
+/// the client's reservation to pin pages in memory. It will automatically try to
+/// increase the client's reservation whenever it needs to do so to make progress.
+///
+/// The stream has both pinned and unpinned modes. In the pinned mode all pages are
+/// pinned for reading. The pinned mode avoids I/O by keeping all pages pinned in memory
+/// and allows clients to save pointers to rows in the stream and randomly access them.
+/// E.g. hash tables can be backed by a BufferedTupleStream. In the unpinned mode, only
+/// pages currently being read and written are pinned and other pages are unpinned and
+/// therefore do not use the client's reservation and can be spilled to disk.
+///
+/// When the stream is in read/write mode, the stream always uses one buffer's worth
+/// of reservation of writing and at least one buffer's worth of reservation for reading,
+/// even if the same page is currently being read and written. This means that
+/// UnpinStream() always succeeds, and moving to the next write page or read page on an
+/// unpinned stream does not require additional reservation.
+/// TODO: IMPALA-3208: variable-length pages will add a caveat here.
+///
+/// The tuple stream also supports a 'delete_on_read' mode, enabled by passing a flag
+/// to PrepareForRead() which deletes the stream's pages as it does a final read
+/// pass over the stream.
+///
+/// TODO: IMPALA-4179: the buffer management can be simplified once we can attach
+/// buffers to RowBatches.
+///
+/// Page layout:
+/// Rows are stored back to back starting at the first byte of each page's buffer, with
+/// no interleaving of data from different rows. There is no padding or alignment
+/// between rows.
+///
+/// Tuple row layout:
+/// If the stream's tuples are nullable (i.e. has_nullable_tuple_ is true), there is a
+/// bitstring at the start of each row with null indicators for all tuples in each row
+/// (including non-nullable tuples). The bitstring occupies ceil(num_tuples_per_row / 8)
+/// bytes. A 1 indicates the tuple is null.
+///
+/// The fixed length parts of the row's tuples are stored first, followed by var len data
+/// for inlined_string_slots_ and inlined_coll_slots_. Other "external" var len slots can
+/// point to var len data outside the stream. When reading the stream, the length of each
+/// row's var len data in the stream must be computed to find the next row's start.
+///
+/// The tuple stream supports reading from the stream into RowBatches without copying
+/// out any data: the RowBatches' Tuple pointers will point directly into the stream's
+/// pages' buffers. The fixed length parts follow Impala's internal tuple format, so for
+/// the tuple to be valid, we only need to update pointers to point to the var len data
+/// in the stream. These pointers need to be updated by the stream because a spilled
+/// page's data may be relocated to a different buffer. The pointers are updated lazily
+/// upon reading the stream via GetNext() or GetRows().
+///
+/// Example layout for a row with two non-nullable tuples ((1, "hello"), (2, "world"))
+/// with all var len data stored in the stream:
+///  <---- tuple 1 -----> <------ tuple 2 ------> <- var len -> <- next row ...
+/// +--------+-----------+-----------+-----------+-------------+
+/// | IntVal | StringVal | BigIntVal | StringVal |             | ...
+/// +--------+-----------+-----------+-----------++------------+
+/// | val: 1 | len: 5    | val: 2    | len: 5    | helloworld  | ...
+/// |        | ptr: 0x.. |           | ptr: 0x.. |             | ...
+/// +--------+-----------+-----------+-----------+-------------+
+///  <--4b--> <---12b---> <----8b---> <---12b---> <----10b---->
+///
+/// Example layout for a row with the second tuple nullable ((1, "hello"), NULL)
+/// with all var len data stored in the stream:
+/// <- null tuple bitstring -> <---- tuple 1 -----> <- var len -> <- next row ...
+/// +-------------------------+--------+-----------+------------+
+/// |                         | IntVal | StringVal |            | ...
+/// +-------------------------+--------+-----------+------------+
+/// | 0000 0010               | val: 1 | len: 5    | hello      | ...
+/// |                         |        | ptr: 0x.. |            | ...
+/// +-------------------------+--------+-----------+------------+
+///  <---------1b------------> <--4b--> <---12b---> <----5b---->
+///
+/// Example layout for a row with a single non-nullable tuple (("hello", "world")) with
+/// the second string slot stored externally to the stream:
+///  <------ tuple 1 ------> <- var len ->  <- next row ...
+/// +-----------+-----------+-------------+
+/// | StringVal | StringVal |             | ...
+/// +-----------+-----------+-------------+
+/// | len: 5    | len: 5    |  hello      | ...
+/// | ptr: 0x.. | ptr: 0x.. |             | ...
+/// +-----------+-----------+-------------+
+///  <---12b---> <---12b---> <-----5b---->
+///
+/// The behavior of reads and writes is as follows:
+/// Read:
+///   1. Unpinned: Only a single read page is pinned at a time. This means that only
+///     enough reservation to pin a single page is needed to read the stream, regardless
+///     of the stream's size. Each page is deleted or unpinned (if delete on read is true
+///     or false respectively) before advancing to the next page.
+///   2. Pinned: All pages in the stream are pinned so do not need to be pinned or
+///     unpinned when reading from the stream. If delete on read is true, pages are
+///     deleted after being read.
+/// Write:
+///   1. Unpinned: Unpin pages as they fill up. This means that only a enough reservation
+///     to pin a single write page is required to write to the stream, regardless of the
+///     stream's size.
+///   2. Pinned: Pages are left pinned. If the next page in the stream cannot be pinned
+///     because the caller's reservation is insufficient (and could not be increased by
+///     the stream), the read call will fail and the caller can either unpin the stream
+///     or free up other memory before retrying.
+///
+/// Memory lifetime of rows read from stream:
+/// If the stream is pinned and delete on read is false, it is valid to access any tuples
+/// returned via GetNext() or GetRows() until the stream is unpinned. If the stream is
+/// unpinned or delete on read is true, then the batch returned from GetNext() may have
+/// the needs_deep_copy flag set, which means that any tuple memory returned so far from
+/// the stream may be freed on the next call to GetNext().
+/// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch.
+///
+/// Manual construction of rows with AllocateRow():
+/// The BufferedTupleStream supports allocation of uninitialized rows with AllocateRow().
+/// AllocateRow() is called instead of AddRow() if the caller wants to manually construct
+/// a row. The caller of AllocateRow() is responsible for writing the row with exactly the
+/// layout described above.
+///
+/// If a caller constructs a tuple in this way, the caller can set the pointers and they
+/// will not be modified until the stream is read via GetNext() or GetRows().
+/// TODO: IMPALA-5007: try to remove AllocateRow() by unifying with AddRow().
+///
+/// TODO: we need to be able to do read ahead for pages. We need some way to indicate a
+/// page will need to be pinned soon.
+class BufferedTupleStreamV2 {
+ public:
+  /// A pointer to the start of a flattened TupleRow in the stream.
+  typedef uint8_t* FlatRowPtr;
+
+  /// row_desc: description of rows stored in the stream. This is the desc for rows
+  /// that are added and the rows being returned.
+  /// page_len: the size of pages to use in the stream
+  /// TODO:IMPALA-3208: support a default and maximum page length
+  /// ext_varlen_slots: set of varlen slots with data stored externally to the stream
+  BufferedTupleStreamV2(RuntimeState* state, const RowDescriptor& row_desc,
+      BufferPool::ClientHandle* buffer_pool_client, int64_t page_len,
+      const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
+
+  virtual ~BufferedTupleStreamV2();
+
+  /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called
+  /// once before any of the other APIs.
+  /// If 'pinned' is true, the tuple stream starts off pinned, otherwise it is unpinned.
+  /// 'node_id' is only used for error reporting.
+  Status Init(int node_id, bool pinned) WARN_UNUSED_RESULT;
+
+  /// Prepares the stream for writing by attempting to allocate a write buffer. Tries to
+  /// increase reservation if there is not enough unused reservation for the buffer.
+  /// Called after Init() and before the first AddRow() or AllocateRow() call.
+  /// 'got_reservation': set to true if there was enough reservation to initialize the
+  ///     first write page and false if there was not enough reservation and no other
+  ///     error was encountered. Undefined if an error status is returned.
+  Status PrepareForWrite(bool* got_reservation) WARN_UNUSED_RESULT;
+
+  /// Prepares the stream for interleaved reads and writes by allocating read and write
+  /// buffers. Called after Init() and before the first AddRow() or AllocateRow() call.
+  /// delete_on_read: Pages are deleted after they are read.
+  /// 'got_reservation': set to true if there was enough reservation to initialize the
+  ///     read and write pages and false if there was not enough reservation and no other
+  ///     error was encountered. Undefined if an error status is returned.
+  Status PrepareForReadWrite(
+      bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+
+  /// Prepares the stream for reading, invalidating the write iterator (if there is one).
+  /// Therefore must be called after the last AddRow() or AllocateRow() and before
+  /// GetNext(). PrepareForRead() can be called multiple times to do multiple read passes
+  /// over the stream, unless PrepareForRead() or PrepareForReadWrite() was previously
+  /// called with delete_on_read = true.
+  /// delete_on_read: Pages are deleted after they are read.
+  /// 'got_reservation': set to true if there was enough reservation to initialize the
+  ///     first read page and false if there was not enough reservation and no other
+  ///     error was encountered. Undefined if an error status is returned.
+  Status PrepareForRead(bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+
+  /// Adds a single row to the stream. There are three possible outcomes:
+  /// a) The append succeeds. True is returned.
+  /// b) The append fails because the unused reservation was not sufficient to add
+  ///   a new page to the stream and the stream could not increase the reservation
+  ///   sufficiently. Returns false and sets 'status' to OK. The append can be retried
+  ///   after freeing up memory or unpinning the stream.
+  /// c) The append fails with a runtime error. Returns false and sets 'status' to an
+  ///   error.
+  /// d) The append fails becase the row is too large to fit in a page of a stream.
+  ///   Returns false and sets 'status' to an error.
+  ///
+  /// Unpinned streams avoid case b) because memory is automatically freed up by
+  /// unpinning the current write page.
+  /// TODO: IMPALA-3808: update to reflect behaviour with variable-length pages
+  ///
+  /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow()
+  /// returns an error, it should not be called again.
+  bool AddRow(TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT;
+
+  /// Allocates space to store a row of with fixed length 'fixed_size' and variable
+  /// length data 'varlen_size'. If successful, returns the pointer where fixed length
+  /// data should be stored and assigns 'varlen_data' to where var-len data should
+  /// be stored.  AllocateRow does not currently support nullable tuples.
+  ///
+  /// The meaning of the return values are the same as AddRow(), except failure is
+  /// indicated by returning NULL instead of false.
+  uint8_t* AllocateRow(
+      int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status);
+
+  /// Unflattens 'flat_row' into a regular TupleRow 'row'. Only valid to call if the
+  /// stream is pinned. The row must have been allocated with the stream's row desc.
+  /// The returned 'row' is backed by memory from the stream so is only valid as long
+  /// as the stream is pinned.
+  void GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const;
+
+  /// Pins all pages in this stream and switches to pinned mode. Has no effect if the
+  /// stream is already pinned.
+  /// If the current unused reservation is not sufficient to pin the stream in memory,
+  /// this will try to increase the reservation. If that fails, 'pinned' is set to false
+  /// and the stream is left unpinned. Otherwise 'pinned' is set to true.
+  Status PinStream(bool* pinned) WARN_UNUSED_RESULT;
+
+  /// Modes for UnpinStream().
+  enum UnpinMode {
+    /// All pages in the stream are unpinned and the read/write positions in the stream
+    /// are reset. No more rows can be written to the stream after this. The stream can
+    /// be re-read from the beginning by calling PrepareForRead().
+    UNPIN_ALL,
+    /// All pages are unpinned aside from the current read and write pages (if any),
+    /// which is left in the same state. The unpinned stream can continue being read
+    /// or written from the current read or write positions.
+    UNPIN_ALL_EXCEPT_CURRENT,
+  };
+
+  /// Unpins stream with the given 'mode' as described above.
+  void UnpinStream(UnpinMode mode);
+
+  /// Get the next batch of output rows, which are backed by the stream's memory.
+  /// If the stream is unpinned or 'delete_on_read' is true, the 'needs_deep_copy'
+  /// flag may be set on 'batch' to signal that memory will be freed on the next
+  /// call to GetNext() and that the caller should copy out any data it needs from
+  /// rows in 'batch' or in previous batches returned from GetNext().
+  ///
+  /// If the stream is pinned and 'delete_on_read' is false, the memory backing the
+  /// rows will remain valid until the stream is unpinned, destroyed, etc.
+  /// TODO: IMPALA-4179: update when we simplify the memory transfer model.
+  Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT;
+
+  /// Same as above, but populate 'flat_rows' with a pointer to the flat version of
+  /// each returned row in the pinned stream. The pointers in 'flat_rows' are only
+  /// valid as long as the stream remains pinned.
+  Status GetNext(
+      RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows) WARN_UNUSED_RESULT;
+
+  /// Returns all the rows in the stream in batch. This pins the entire stream in the
+  /// process. If the current unused reservation is not sufficient to pin the stream in
+  /// memory, this will try to increase the reservation. If that fails, 'got_rows' is set
+  /// to false.
+  Status GetRows(MemTracker* tracker, boost::scoped_ptr<RowBatch>* batch,
+      bool* got_rows) WARN_UNUSED_RESULT;
+
+  /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL,
+  /// attaches buffers from any pinned pages to the batch and deletes unpinned
+  /// pages. Otherwise deletes all pages. Does nothing if the stream was already
+  /// closed. The 'flush' mode is forwarded to RowBatch::AddBuffer() when attaching
+  /// buffers.
+  void Close(RowBatch* batch, RowBatch::FlushMode flush);
+
+  /// Number of rows in the stream.
+  int64_t num_rows() const { return num_rows_; }
+
+  /// Number of rows returned via GetNext().
+  int64_t rows_returned() const { return rows_returned_; }
+
+  /// Returns the byte size necessary to store the entire stream in memory.
+  int64_t byte_size() const { return total_byte_size_; }
+
+  /// Returns the number of bytes currently pinned in memory by the stream.
+  /// If ignore_current is true, the write_page_ memory is not included.
+  int64_t BytesPinned(bool ignore_current) const {
+    if (ignore_current && write_page_ != nullptr && write_page_->is_pinned()) {
+      return bytes_pinned_ - write_page_->len();
+    }
+    return bytes_pinned_;
+  }
+
+  bool is_closed() const { return closed_; }
+  bool is_pinned() const { return pinned_; }
+  bool has_read_iterator() const { return read_page_ != pages_.end(); }
+  bool has_write_iterator() const { return write_page_ != nullptr; }
+
+  std::string DebugString() const;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(BufferedTupleStreamV2);
+  friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test;
+  friend class ArrayTupleStreamTest_TestComputeRowSize_Test;
+  friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test;
+  friend class SimpleTupleStreamTest_TestGetRowsOverflow_Test;
+
+  /// Wrapper around BufferPool::PageHandle that tracks additional info about the page.
+  struct Page {
+    Page() : num_rows(0) {}
+
+    inline int len() const { return handle.len(); }
+    inline uint8_t* data() const { return handle.data(); }
+    inline bool is_pinned() const { return handle.is_pinned(); }
+    inline int pin_count() const { return handle.pin_count(); }
+    std::string DebugString() const;
+
+    BufferPool::PageHandle handle;
+
+    /// Number of rows written to the page.
+    int num_rows;
+  };
+
+  /// Runtime state instance used to check for cancellation. Not owned.
+  RuntimeState* const state_;
+
+  /// Description of rows stored in the stream.
+  const RowDescriptor& desc_;
+
+  /// Sum of the fixed length portion of all the tuples in desc_, including any null
+  /// indicators.
+  int fixed_tuple_row_size_;
+
+  /// The size of the fixed length portion for each tuple in the row.
+  std::vector<int> fixed_tuple_sizes_;
+
+  /// Vectors of all the strings slots that have their varlen data stored in stream
+  /// grouped by tuple_idx.
+  std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_string_slots_;
+
+  /// Vectors of all the collection slots that have their varlen data stored in the
+  /// stream, grouped by tuple_idx.
+  std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_coll_slots_;
+
+  /// Buffer pool and client used to allocate, pin and release pages. Not owned.
+  BufferPool* buffer_pool_;
+  BufferPool::ClientHandle* buffer_pool_client_;
+
+  /// List of pages in the stream.
+  /// Empty before PrepareForWrite() is called or after the stream has been destructively
+  /// read in 'delete_on_read' mode. Non-empty otherwise.
+  std::list<Page> pages_;
+
+  /// Total size of pages_, including any pages already deleted in 'delete_on_read'
+  /// mode.
+  int64_t total_byte_size_;
+
+  /// Iterator pointing to the current page for reading. Equal to list.end() when no
+  /// read iterator is active. GetNext() does not advance this past the end of
+  /// the stream, so upon eos 'read_page_' points to the last page and
+  /// rows_returned_ == num_rows_. Always pinned, unless a Pin() call failed and an
+  /// error status was returned.
+  std::list<Page>::iterator read_page_;
+
+  /// Number of rows returned from the current read_page_.
+  uint32_t read_page_rows_returned_;
+
+  /// Pointer into read_page_ to the byte after the last row read.
+  uint8_t* read_ptr_;
+
+  /// Pointer into write_page_ to the byte after the last row written.
+  uint8_t* write_ptr_;
+
+  /// Pointer to one byte past the end of write_page_. Cached to speed up computation
+  uint8_t* write_end_ptr_;
+
+  /// Number of rows returned to the caller from GetNext() since the last
+  /// PrepareForRead() call.
+  int64_t rows_returned_;
+
+  /// The current page for writing. NULL if there is no available page to write to.
+  /// Always pinned. If 'read_page_' and 'write_page_' reference the same page, then
+  /// that page is only pinned once.
+  Page* write_page_;
+
+  /// Total bytes of pinned pages in pages_, stored to avoid iterating over the list
+  /// to compute it.
+  int64_t bytes_pinned_;
+
+  /// Number of rows stored in the stream. Includes rows that were already deleted during
+  /// a destructive 'delete_on_read' pass over the stream.
+  int64_t num_rows_;
+
+  /// The length in bytes of pages used to store the stream's rows.
+  /// TODO: IMPALA-3808: support variable-length pages
+  const int64_t page_len_;
+
+  /// Whether any tuple in the rows is nullable.
+  const bool has_nullable_tuple_;
+
+  /// If true, pages are deleted after they are read.
+  bool delete_on_read_;
+
+  bool closed_; // Used for debugging.
+
+  /// If true, this stream has been explicitly pinned by the caller and all pages are
+  /// kept pinned until the caller calls UnpinStream().
+  bool pinned_;
+
+  bool is_read_page(const Page* page) const {
+    return has_read_iterator() && &*read_page_ == page;
+  }
+
+  bool is_write_page(const Page* page) const { return write_page_ == page; }
+
+  /// The slow path for AddRow() that is called if there is not sufficient space in
+  /// the current page.
+  bool AddRowSlow(TupleRow* row, Status* status) noexcept;
+
+  /// The slow path for AllocateRow() that is called if there is not sufficient space in
+  /// the current page.
+  uint8_t* AllocateRowSlow(
+      int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) noexcept;
+
+  /// Copies 'row' into write_page_. Returns false if there is not enough space in
+  /// 'write_page_'. After returning false, write_ptr_ may be left pointing to the
+  /// partially-written row, and no more data can be written to write_page_.
+  template <bool HAS_NULLABLE_TUPLE>
+  bool DeepCopyInternal(TupleRow* row) noexcept;
+
+  /// Helper function to copy strings in string_slots from tuple into write_page_.
+  /// Updates write_ptr_ to the end of the string data added. Returns false if the data
+  /// does not fit in the current write page. After returning false, write_ptr_ is left
+  /// pointing to the partially-written row, and no more data can be written to
+  /// write_page_.
+  bool CopyStrings(const Tuple* tuple, const std::vector<SlotDescriptor*>& string_slots);
+
+  /// Helper function to deep copy collections in collection_slots from tuple into
+  /// write_page_. Updates write_ptr_ to the end of the collection data added. Returns
+  /// false if the data does not fit in the current write page. After returning false,
+  /// write_ptr_ is left pointing to the partially-written row, and no more data can be
+  /// written to write_page_.
+  bool CopyCollections(
+      const Tuple* tuple, const std::vector<SlotDescriptor*>& collection_slots);
+
+  /// Wrapper of the templated DeepCopyInternal() function.
+  bool DeepCopy(TupleRow* row) noexcept;
+
+  /// Gets a new page of 'page_len_' bytes from buffer_pool_, updating write_page_,
+  /// write_ptr_ and write_end_ptr_. The caller must ensure there is sufficient unused
+  /// reservation to allocate the page. The caller must reset the write iterator (if
+  /// there is one).
+  Status NewWritePage() noexcept WARN_UNUSED_RESULT;
+
+  /// Validates that a page can fit a row of 'row_size' bytes.
+  /// Returns an error if the row cannot fit in a page.
+  Status CheckPageSizeForRow(int64_t row_size);
+
+  /// Wrapper around NewWritePage() that allocates a new write page that fits a row of
+  /// 'row_size' bytes. Increases reservation if needed to allocate the next page.
+  /// Returns OK and sets 'got_reservation' to true if the write page was successfully
+  /// allocated. Returns an error if the row cannot fit in a page. Returns OK and sets
+  /// 'got_reservation' to false if the reservation could not be increased and no other
+  /// error was encountered.
+  Status AdvanceWritePage(
+      int64_t row_size, bool* got_reservation) noexcept WARN_UNUSED_RESULT;
+
+  /// Reset the write page, if there is one, and unpin pages accordingly.
+  void ResetWritePage();
+
+  /// Same as PrepareForRead(), except the iterators are not invalidated and
+  /// the caller is assumed to have checked there is sufficient unused reservation.
+  Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT;
+
+  /// Pins the next read page. This blocks reading from disk if necessary to bring the
+  /// page's data into memory. Updates read_page_, read_ptr_, and
+  /// read_page_rows_returned_.
+  Status NextReadPage() WARN_UNUSED_RESULT;
+
+  /// Reset the read page, if there is one, and unpin pages accordingly.
+  void ResetReadPage();
+
+  /// Returns the total additional bytes that this row will consume in write_page_ if
+  /// appended to the page. This includes the row's null indicators, the fixed length
+  /// part of the row and the data for inlined_string_slots_ and inlined_coll_slots_.
+  int64_t ComputeRowSize(TupleRow* row) const noexcept;
+
+  /// Pins page and updates tracking stats.
+  Status PinPage(Page* page) WARN_UNUSED_RESULT;
+
+  /// Increment the page's pin count if this page needs a higher pin count given the
+  /// current read and write iterator positions and whether the stream will be pinned
+  /// ('stream_pinned'). Assumes that no scenarios occur when the pin count needs to
+  /// be incremented multiple times. The caller is responsible for ensuring sufficient
+  /// reservation is available.
+  Status PinPageIfNeeded(Page* page, bool stream_pinned) WARN_UNUSED_RESULT;
+
+  /// Decrement the page's pin count if this page needs a lower pin count given the
+  /// current read and write iterator positions and whether the stream will be pinned
+  /// ('stream_pinned'). Assumes that no scenarios occur when the pin count needs to
+  /// be decremented multiple times.
+  void UnpinPageIfNeeded(Page* page, bool stream_pinned);
+
+  /// Return the expected pin count for 'page' in the current stream based on the current
+  /// read and write pages and whether the stream is pinned.
+  int ExpectedPinCount(bool stream_pinned, const Page* page) const;
+
+  /// Templated GetNext implementations.
+  template <bool FILL_FLAT_ROWS>
+  Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows);
+  template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE>
+  Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows);
+
+  /// Helper function to convert a flattened TupleRow stored starting at '*data' into
+  /// 'row'. *data is updated to point to the first byte past the end of the row.
+  template <bool HAS_NULLABLE_TUPLE>
+  void UnflattenTupleRow(uint8_t** data, TupleRow* row) const;
+
+  /// Helper function for GetNextInternal(). For each string slot in string_slots,
+  /// update StringValue's ptr field to point to the corresponding string data stored
+  /// inline in the stream (at the current value of read_ptr_) advance read_ptr_ by the
+  /// StringValue's length field.
+  void FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots, Tuple* tuple);
+
+  /// Helper function for GetNextInternal(). For each collection slot in collection_slots,
+  /// recursively update any pointers in the CollectionValue to point to the corresponding
+  /// var len data stored inline in the stream, advancing read_ptr_ as data is read.
+  /// Assumes that the collection was serialized to the stream in DeepCopy()'s format.
+  void FixUpCollectionsForRead(
+      const vector<SlotDescriptor*>& collection_slots, Tuple* tuple);
+
+  /// Returns the number of null indicator bytes per row. Only valid if this stream has
+  /// nullable tuples.
+  int NullIndicatorBytesPerRow() const;
+
+  /// Returns the total bytes pinned. Only called in DCHECKs to validate bytes_pinned_.
+  int64_t CalcBytesPinned() const;
+
+  /// DCHECKs if the stream is internally inconsistent. The stream should always be in
+  /// a consistent state after returning success from a public API call.
+  void CheckConsistency() const;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/buffered-tuple-stream-v2.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.inline.h b/be/src/runtime/buffered-tuple-stream-v2.inline.h
new file mode 100644
index 0000000..6ad4bc4
--- /dev/null
+++ b/be/src/runtime/buffered-tuple-stream-v2.inline.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_INLINE_H
+#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_INLINE_H
+
+#include "runtime/buffered-tuple-stream-v2.h"
+
+#include "runtime/descriptors.h"
+#include "runtime/tuple-row.h"
+#include "util/bit-util.h"
+
+namespace impala {
+
+inline int BufferedTupleStreamV2::NullIndicatorBytesPerRow() const {
+  DCHECK(has_nullable_tuple_);
+  return BitUtil::RoundUpNumBytes(fixed_tuple_sizes_.size());
+}
+
+inline bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) noexcept {
+  DCHECK(!closed_);
+  if (LIKELY(DeepCopy(row))) return true;
+  return AddRowSlow(row, status);
+}
+
+inline uint8_t* BufferedTupleStreamV2::AllocateRow(
+    int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) {
+  DCHECK(!closed_);
+  DCHECK(!has_nullable_tuple_) << "AllocateRow does not support nullable tuples";
+  const int total_size = fixed_size + varlen_size;
+  if (UNLIKELY(write_page_ == nullptr || write_ptr_ + total_size > write_end_ptr_)) {
+    return AllocateRowSlow(fixed_size, varlen_size, varlen_data, status);
+  }
+  DCHECK(write_page_ != nullptr);
+  DCHECK(write_page_->is_pinned());
+  DCHECK_LE(write_ptr_ + total_size, write_end_ptr_);
+  ++num_rows_;
+  ++write_page_->num_rows;
+
+  uint8_t* fixed_data = write_ptr_;
+  write_ptr_ += fixed_size;
+  *varlen_data = write_ptr_;
+  write_ptr_ += varlen_size;
+  return fixed_data;
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 0e0d384..d4640ac 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -44,7 +44,10 @@ namespace impala {
 
 class BufferPoolTest : public ::testing::Test {
  public:
-  virtual void SetUp() { test_env_ = obj_pool_.Add(new TestEnv); }
+  virtual void SetUp() {
+    test_env_ = obj_pool_.Add(new TestEnv);
+    ASSERT_OK(test_env_->Init());
+  }
 
   virtual void TearDown() {
     for (auto entry : query_reservations_) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 850c90b..611520c 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -27,6 +27,8 @@
 #include "gen-cpp/CatalogService.h"
 #include "gen-cpp/ImpalaInternalService.h"
 #include "runtime/backend-client.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
 #include "runtime/data-stream-mgr.h"
@@ -42,6 +44,7 @@
 #include "scheduling/scheduler.h"
 #include "service/frontend.h"
 #include "statestore/statestore-subscriber.h"
+#include "util/bit-util.h"
 #include "util/debug-util.h"
 #include "util/debug-util.h"
 #include "util/default-path-handlers.h"
@@ -148,6 +151,8 @@ ExecEnv::ExecEnv()
         "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
     query_exec_mgr_(new QueryExecMgr()),
+    buffer_reservation_(nullptr),
+    buffer_pool_(nullptr),
     enable_webserver_(FLAGS_enable_webserver),
     is_fe_tests_(false),
     backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
@@ -202,6 +207,8 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
         "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
     query_exec_mgr_(new QueryExecMgr()),
+    buffer_reservation_(nullptr),
+    buffer_pool_(NULL),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
     is_fe_tests_(false),
     backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
@@ -229,7 +236,10 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
   exec_env_ = this;
 }
 
-ExecEnv::~ExecEnv() {}
+ExecEnv::~ExecEnv() {
+  if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
+  disk_io_mgr_.reset(); // Need to tear down before mem_tracker_.
+}
 
 Status ExecEnv::InitForFeTests() {
   mem_tracker_.reset(new MemTracker(-1, "Process"));
@@ -273,18 +283,6 @@ Status ExecEnv::StartServices() {
   if (bytes_limit < 0) {
     return Status("Failed to parse mem limit from '" + FLAGS_mem_limit + "'.");
   }
-  // Minimal IO Buffer requirements:
-  //   IO buffer (8MB default) * number of IO buffers per thread (5) *
-  //   number of threads per core * number of cores
-  int64_t min_requirement = disk_io_mgr_->max_read_buffer_size() *
-      DiskIoMgr::DEFAULT_QUEUE_CAPACITY *
-      FLAGS_num_threads_per_core * FLAGS_num_cores;
-  if (bytes_limit < min_requirement) {
-    LOG(WARNING) << "Memory limit "
-                 << PrettyPrinter::Print(bytes_limit, TUnit::BYTES)
-                 << " does not meet minimal memory requirement of "
-                 << PrettyPrinter::Print(min_requirement, TUnit::BYTES);
-  }
 
   metrics_->Init(enable_webserver_ ? webserver_.get() : NULL);
   impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
@@ -353,4 +351,10 @@ Status ExecEnv::StartServices() {
   return Status::OK();
 }
 
+void ExecEnv::InitBufferPool(int64_t min_page_size, int64_t capacity) {
+  DCHECK(buffer_pool_ == nullptr);
+  buffer_pool_.reset(new BufferPool(min_page_size, capacity));
+  buffer_reservation_.reset(new ReservationTracker());
+  buffer_reservation_->InitRootTracker(NULL, capacity);
+}
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index bdeb4a4..a5777ef 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -28,6 +28,7 @@
 
 namespace impala {
 
+class BufferPool;
 class CallableThreadPool;
 class DataStreamMgr;
 class DiskIoMgr;
@@ -42,6 +43,7 @@ class PoolMemTrackerRegistry;
 class MetricGroup;
 class QueryResourceMgr;
 class RequestPoolService;
+class ReservationTracker;
 class Scheduler;
 class StatestoreSubscriber;
 class TestExecEnv;
@@ -65,8 +67,7 @@ class ExecEnv {
   /// we return the most recently created instance.
   static ExecEnv* GetInstance() { return exec_env_; }
 
-  /// Empty destructor because the compiler-generated one requires full
-  /// declarations for classes in scoped_ptrs.
+  /// Destructor - only used in backend tests that create new environment per test.
   virtual ~ExecEnv();
 
   void SetImpalaServer(ImpalaServer* server) { impala_server_ = server; }
@@ -99,6 +100,8 @@ class ExecEnv {
   CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); }
   QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); }
   PoolMemTrackerRegistry* pool_mem_trackers() { return pool_mem_trackers_.get(); }
+  ReservationTracker* buffer_reservation() { return buffer_reservation_.get(); }
+  BufferPool* buffer_pool() { return buffer_pool_.get(); }
 
   void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
 
@@ -143,12 +146,20 @@ class ExecEnv {
   boost::scoped_ptr<CallableThreadPool> async_rpc_pool_;
   boost::scoped_ptr<QueryExecMgr> query_exec_mgr_;
 
+  /// Query-wide buffer pool and the root reservation tracker for the pool. The
+  /// reservation limit is equal to the maximum capacity of the pool.
+  /// For now this is only used by backend tests that create them via InitBufferPool();
+  boost::scoped_ptr<ReservationTracker> buffer_reservation_;
+  boost::scoped_ptr<BufferPool> buffer_pool_;
+
   /// Not owned by this class
   ImpalaServer* impala_server_;
 
   bool enable_webserver_;
 
  private:
+  friend class TestEnv;
+
   static ExecEnv* exec_env_;
   bool is_fe_tests_;
 
@@ -157,6 +168,9 @@ class ExecEnv {
 
   /// fs.defaultFs value set in core-site.xml
   std::string default_fs_;
+
+  /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity.
+  void InitBufferPool(int64_t min_page_len, int64_t capacity);
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 2b2e3a0..b931808 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -15,16 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #include "runtime/query-state.h"
 
-#include <boost/thread/locks.hpp>
 #include <boost/thread/lock_guard.hpp>
+#include <boost/thread/locks.hpp>
 
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-exec-mgr.h"
+#include "util/debug-util.h"
 
 #include "common/names.h"
 
@@ -41,7 +43,12 @@ QueryState::ScopedRef::~ScopedRef() {
 }
 
 QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool)
-  : query_ctx_(query_ctx), refcnt_(0), prepared_(false), released_resources_(false) {
+  : query_ctx_(query_ctx),
+    refcnt_(0),
+    prepared_(false),
+    released_resources_(false),
+    buffer_reservation_(nullptr),
+    file_group_(nullptr) {
   TQueryOptions& query_options = query_ctx_.client_request.query_options;
   // max_errors does not indicate how many errors in total have been recorded, but rather
   // how many are distinct. It is defined as the sum of the number of generic errors and
@@ -56,8 +63,12 @@ QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool)
 }
 
 void QueryState::ReleaseResources() {
+  // Clean up temporary files.
+  if (file_group_ != nullptr) file_group_->Close();
+  // Release any remaining reservation.
+  if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
   // Avoid dangling reference from the parent of 'query_mem_tracker_'.
-  query_mem_tracker_->UnregisterFromParent();
+  if (query_mem_tracker_ != nullptr) query_mem_tracker_->UnregisterFromParent();
   released_resources_ = true;
 }
 
@@ -77,19 +88,27 @@ Status QueryState::Prepare() {
   // Starting a new query creates threads and consumes a non-trivial amount of memory.
   // If we are already starved for memory, fail as early as possible to avoid consuming
   // more resources.
-  MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
+  ExecEnv* exec_env = ExecEnv::GetInstance();
+  MemTracker* process_mem_tracker = exec_env->process_mem_tracker();
   if (process_mem_tracker->LimitExceeded()) {
     string msg = Substitute("Query $0 could not start because the backend Impala daemon "
                             "is over its memory limit",
         PrintId(query_id()));
-    prepare_status_ = process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
-    return prepare_status_;
+    status = process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
+    goto error;
+  }
+  // Do buffer-pool-related setup if running in a backend test that explicitly created
+  // the pool.
+  if (exec_env->buffer_pool() != nullptr) {
+    status = InitBufferPoolState();
+    if (!status.ok()) goto error;
   }
-
-  // TODO: IMPALA-3748: acquire minimum buffer reservation at this point.
-
   prepared_ = true;
   return Status::OK();
+
+error:
+  prepare_status_ = status;
+  return status;
 }
 
 void QueryState::InitMemTrackers(const std::string& pool) {
@@ -103,6 +122,37 @@ void QueryState::InitMemTrackers(const std::string& pool) {
       MemTracker::CreateQueryMemTracker(query_id(), query_options(), pool, &obj_pool_);
 }
 
+Status QueryState::InitBufferPoolState() {
+  ExecEnv* exec_env = ExecEnv::GetInstance();
+  int64_t query_mem_limit = query_mem_tracker_->limit();
+  if (query_mem_limit == -1) query_mem_limit = numeric_limits<int64_t>::max();
+
+  // TODO: IMPALA-3200: add a default upper bound to buffer pool memory derived from
+  // query_mem_limit.
+  int64_t max_reservation = numeric_limits<int64_t>::max();
+  if (query_options().__isset.max_block_mgr_memory
+      && query_options().max_block_mgr_memory > 0) {
+    max_reservation = query_options().max_block_mgr_memory;
+  }
+
+  // TODO: IMPALA-3748: claim the query-wide minimum reservation.
+  // For now, rely on exec nodes to grab their minimum reservation during Prepare().
+  buffer_reservation_ = obj_pool_.Add(new ReservationTracker);
+  buffer_reservation_->InitChildTracker(
+      NULL, exec_env->buffer_reservation(), query_mem_tracker_, max_reservation);
+
+  // TODO: once there's a mechanism for reporting non-fragment-local profiles,
+  // should make sure to report this profile so it's not going into a black hole.
+  RuntimeProfile* dummy_profile = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "dummy"));
+  // Only create file group if spilling is enabled.
+  if (query_options().scratch_limit != 0 && !query_ctx_.disable_spilling) {
+    file_group_ = obj_pool_.Add(
+        new TmpFileMgr::FileGroup(exec_env->tmp_file_mgr(), exec_env->disk_io_mgr(),
+            dummy_profile, query_id(), query_options().scratch_limit));
+  }
+  return Status::OK();
+}
+
 void QueryState::RegisterFInstance(FragmentInstanceState* fis) {
   VLOG_QUERY << "RegisterFInstance(): instance_id=" << PrintId(fis->instance_id());
   lock_guard<SpinLock> l(fis_map_lock_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index c381dfe..650e8bf 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -26,6 +26,7 @@
 #include "common/object-pool.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
+#include "runtime/tmp-file-mgr.h"
 #include "util/spinlock.h"
 #include "util/uid-util.h"
 
@@ -33,6 +34,7 @@ namespace impala {
 
 class FragmentInstanceState;
 class MemTracker;
+class ReservationTracker;
 
 /// Central class for all backend execution state (example: the FragmentInstanceStates
 /// of the individual fragment instances) created for a particular query.
@@ -94,6 +96,8 @@ class QueryState {
   }
 
   MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
+  ReservationTracker* buffer_reservation() const { return buffer_reservation_; }
+  TmpFileMgr::FileGroup* file_group() const { return file_group_; }
 
   /// Sets up state required for fragment execution: memory reservations, etc. Fails
   /// if resources could not be acquired. Safe to call concurrently and idempotent:
@@ -145,12 +149,26 @@ class QueryState {
   /// The top-level MemTracker for this query (owned by obj_pool_).
   MemTracker* query_mem_tracker_;
 
+  /// Buffer reservation for this query (owned by obj_pool_)
+  /// Only non-null in backend tests that explicitly enabled the new buffer pool
+  /// TODO: this will always be non-null once IMPALA-3200 is done
+  ReservationTracker* buffer_reservation_;
+
+  /// Temporary files for this query (owned by obj_pool_)
+  /// Only non-null in backend tests the explicitly enabled the new buffer pool
+  /// TODO: this will always be non-null once IMPALA-3200 is done
+  TmpFileMgr::FileGroup* file_group_;
+
   /// Create QueryState w/ copy of query_ctx and refcnt of 0.
   /// The query is associated with the resource pool named 'pool'
   QueryState(const TQueryCtx& query_ctx, const std::string& pool);
 
   /// Called from Prepare() to initialize MemTrackers.
   void InitMemTrackers(const std::string& pool);
+
+  /// Called from PrepareForExecution() to setup buffer reservations and the
+  /// file group. Fails if required resources are not available.
+  Status InitBufferPoolState();
 };
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 6daa02f..8fb0b55 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -17,17 +17,18 @@
 
 #include "runtime/row-batch.h"
 
-#include <stdint.h>  // for intptr_t
+#include <stdint.h> // for intptr_t
 #include <boost/scoped_ptr.hpp>
 
+#include "gen-cpp/Results_types.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
 #include "util/compress.h"
-#include "util/decompress.h"
 #include "util/debug-util.h"
+#include "util/decompress.h"
 #include "util/fixed-size-hash-table.h"
-#include "gen-cpp/Results_types.h"
 
 #include "common/names.h"
 
@@ -157,6 +158,10 @@ RowBatch::~RowBatch() {
   for (int i = 0; i < blocks_.size(); ++i) {
     blocks_[i]->Delete();
   }
+  for (BufferInfo& buffer_info : buffers_) {
+    ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
+        buffer_info.client, &buffer_info.buffer);
+  }
   if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) {
     DCHECK(tuple_ptrs_ != NULL);
     free(tuple_ptrs_);
@@ -305,6 +310,16 @@ void RowBatch::AddBlock(BufferedBlockMgr::Block* block, FlushMode flush) {
   if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources();
 }
 
+void RowBatch::AddBuffer(
+    BufferPool::ClientHandle* client, BufferPool::BufferHandle buffer, FlushMode flush) {
+  auxiliary_mem_usage_ += buffer.len();
+  BufferInfo buffer_info;
+  buffer_info.client = client;
+  buffer_info.buffer = std::move(buffer);
+  buffers_.push_back(std::move(buffer_info));
+  if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources();
+}
+
 void RowBatch::Reset() {
   num_rows_ = 0;
   capacity_ = tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*));
@@ -318,6 +333,11 @@ void RowBatch::Reset() {
     blocks_[i]->Delete();
   }
   blocks_.clear();
+  for (BufferInfo& buffer_info : buffers_) {
+    ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
+        buffer_info.client, &buffer_info.buffer);
+  }
+  buffers_.clear();
   auxiliary_mem_usage_ = 0;
   if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) {
     tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_));
@@ -336,6 +356,11 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) {
     dest->AddBlock(blocks_[i], FlushMode::NO_FLUSH_RESOURCES);
   }
   blocks_.clear();
+  for (BufferInfo& buffer_info : buffers_) {
+    dest->AddBuffer(
+        buffer_info.client, std::move(buffer_info.buffer), FlushMode::NO_FLUSH_RESOURCES);
+  }
+  buffers_.clear();
   if (needs_deep_copy_) {
     dest->MarkNeedsDeepCopy();
   } else if (flush_ == FlushMode::FLUSH_RESOURCES) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 91433c4..0bb71d8 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -15,18 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #ifndef IMPALA_RUNTIME_ROW_BATCH_H
 #define IMPALA_RUNTIME_ROW_BATCH_H
 
-#include <vector>
 #include <cstring>
+#include <vector>
 #include <boost/scoped_ptr.hpp>
 
 #include "codegen/impala-ir.h"
 #include "common/compiler-util.h"
 #include "common/logging.h"
-#include "runtime/buffered-block-mgr.h" // for BufferedBlockMgr::Block
+#include "runtime/buffered-block-mgr.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/descriptors.h"
 #include "runtime/disk-io-mgr.h"
 #include "runtime/mem-pool.h"
@@ -208,6 +208,7 @@ class RowBatch {
   MemPool* tuple_data_pool() { return &tuple_data_pool_; }
   int num_io_buffers() const { return io_buffers_.size(); }
   int num_blocks() const { return blocks_.size(); }
+  int num_buffers() const { return buffers_.size(); }
 
   /// Resets the row batch, returning all resources it has accumulated.
   void Reset();
@@ -220,10 +221,18 @@ class RowBatch {
   /// the original owner, even when the ownership of batches is transferred. If the
   /// original owner wants the memory to be released, it should call this with 'mode'
   /// FLUSH_RESOURCES (see MarkFlushResources() for further explanation).
-  /// TODO: after IMPALA-3200, make the ownership transfer model consistent between
-  /// Blocks and I/O buffers.
   void AddBlock(BufferedBlockMgr::Block* block, FlushMode flush);
 
+  /// Adds a buffer to this row batch. The buffer is deleted when freeing resources.
+  /// The buffer's memory remains accounted against the original owner, even when the
+  /// ownership of batches is transferred. If the original owner wants the memory to be
+  /// released, it should call this with 'mode' FLUSH_RESOURCES (see MarkFlushResources()
+  /// for further explanation).
+  /// TODO: IMPALA-4179: after IMPALA-3200, simplify the ownership transfer model and
+  /// make it consistent between buffers and I/O buffers.
+  void AddBuffer(
+      BufferPool::ClientHandle* client, BufferPool::BufferHandle buffer, FlushMode flush);
+
   /// Used by an operator to indicate that it cannot produce more rows until the
   /// resources that it has attached to the row batch are freed or acquired by an
   /// ancestor operator. After this is called, the batch is at capacity and no more rows
@@ -424,11 +433,19 @@ class RowBatch {
   /// are owned by the BufferedBlockMgr.
   std::vector<BufferedBlockMgr::Block*> blocks_;
 
+  struct BufferInfo {
+    BufferPool::ClientHandle* client;
+    BufferPool::BufferHandle buffer;
+  };
+
+  /// Pages attached to this row batch. See AddBuffer() for ownership semantics.
+  std::vector<BufferInfo> buffers_;
+
   /// String to write compressed tuple data to in Serialize().
   /// This is a string so we can swap() with the string in the TRowBatch we're serializing
   /// to (we don't compress directly into the TRowBatch in case the compressed data is
-  /// longer than the uncompressed data). Swapping avoids copying data to the TRowBatch and
-  /// avoids excess memory allocations: since we reuse RowBatchs and TRowBatchs, and
+  /// longer than the uncompressed data). Swapping avoids copying data to the TRowBatch
+  /// and avoids excess memory allocations: since we reuse RowBatchs and TRowBatchs, and
   /// assuming all row batches are roughly the same size, all strings will eventually be
   /// allocated to the right size.
   std::string compression_scratch_;


Mime
View raw message