impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From he...@apache.org
Subject [2/2] incubator-impala git commit: IMPALA-5085: large rows in BufferedTupleStreamV2
Date Sat, 17 Jun 2017 22:38:13 GMT
IMPALA-5085: large rows in BufferedTupleStreamV2

The stream defaults to pages of default_page_len_. If a row doesn't
fit in that page, it will allocate another page up to max_page_len_
bytes and append a single row to that page, then immediately unpin
the page. This means that when writing a stream, the large
page only needs to be kept in memory temporarily, which helps with
memory requirements.  E.g. consider a hash join that is repartitioning
1 unpinned stream into 16 unpinned streams. We will need
default_page_len_ * 15 + max_page_len_ * 2 bytes of reservation because
when processing a large row we only need one large write buffer at a
time.

Also switches the stream to lazily allocating write pages, so that
we don't need to allocate a page until we know the size of the row
to go in it. This required a mechanism to "save" reservation in
PrepareForRead()/PrepareForWrite(). A SubReservation APi is added
to BufferPool for this purpose and the stream now saves read and
write reservation for lazy page allocation. It also saves reservation
instead of double-pinning pages in the read/write case.

The large row cases are not as optimised for memory consumption or
performance - queries processing very large numbers of large rows
are an extreme edge case that is likely to hit other performance
bottlenecks first. Pages with large rows can have up to 50%
internal fragmentation.

To avoid duplicating more logic between AddRow() and AllocateRow()
I restructured things so that AddRowSlow() is implemented in terms
of AllocateRowSlow(). AllocateRow() now takes a function as an
argument to populate the row.

Testing:
* Added tests for the case where 0 rows are added to the stream
* Extend BigRow to exercise the new code.
* Also test large strings and read/write streams.

Change-Id: I2861c58efa7bc1aeaa5b7e2f043c97cb3985c8f5
Reviewed-on: http://gerrit.cloudera.org:8080/6638
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a3ce5b44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a3ce5b44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a3ce5b44

Branch: refs/heads/master
Commit: a3ce5b44889b46c7c3995bfa0cfdb620a1464ae8
Parents: bd6d2df
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Thu Apr 13 14:47:51 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Sat Jun 17 10:08:29 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-tuple-stream-v2-test.cc | 377 ++++++++++---
 be/src/runtime/buffered-tuple-stream-v2.cc      | 530 +++++++++++++------
 be/src/runtime/buffered-tuple-stream-v2.h       | 274 ++++++----
 .../runtime/buffered-tuple-stream-v2.inline.h   |  28 +-
 be/src/runtime/bufferpool/buffer-pool-test.cc   |  33 ++
 be/src/runtime/bufferpool/buffer-pool.cc        |  34 ++
 be/src/runtime/bufferpool/buffer-pool.h         |  35 ++
 .../bufferpool/reservation-tracker-test.cc      |   2 +-
 be/src/runtime/bufferpool/reservation-tracker.h |   4 +-
 common/thrift/generate_error_codes.py           |   4 +
 10 files changed, 977 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/buffered-tuple-stream-v2-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2-test.cc b/be/src/runtime/buffered-tuple-stream-v2-test.cc
index 1ae9181..37ddad7 100644
--- a/be/src/runtime/buffered-tuple-stream-v2-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2-test.cc
@@ -86,6 +86,26 @@ class SimpleTupleStreamTest : public testing::Test {
     string_builder.DeclareTuple() << TYPE_STRING;
     string_desc_ =
         pool_.Add(new RowDescriptor(*string_builder.Build(), tuple_ids, nullable_tuples));
+
+    // Construct descriptors for big rows with and without nullable tuples.
+    // Each tuple contains 8 slots of TYPE_INT and a single byte for null indicator.
+    DescriptorTblBuilder big_row_builder(test_env_->exec_env()->frontend(), &pool_);
+    tuple_ids.clear();
+    nullable_tuples.clear();
+    vector<bool> non_nullable_tuples;
+    const int num_tuples = BIG_ROW_BYTES / (8 * sizeof(int) + 1);
+    for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
+      big_row_builder.DeclareTuple() << TYPE_INT << TYPE_INT << TYPE_INT << TYPE_INT
+                                     << TYPE_INT << TYPE_INT << TYPE_INT << TYPE_INT;
+      tuple_ids.push_back(static_cast<TTupleId>(tuple_idx));
+      nullable_tuples.push_back(true);
+      non_nullable_tuples.push_back(false);
+    }
+    big_row_desc_ = pool_.Add(
+        new RowDescriptor(*big_row_builder.Build(), tuple_ids, non_nullable_tuples));
+    ASSERT_FALSE(big_row_desc_->IsAnyTupleNullable());
+    nullable_big_row_desc_ = pool_.Add(
+        new RowDescriptor(*big_row_builder.Build(), tuple_ids, nullable_tuples));
   }
 
   virtual void TearDown() {
@@ -294,8 +314,12 @@ class SimpleTupleStreamTest : public testing::Test {
   // Assumes that enough buffers are available to read and write the stream.
   template <typename T>
   void TestValues(int num_batches, RowDescriptor* desc, bool gen_null, bool unpin_stream,
-      int64_t page_len = PAGE_LEN, int num_rows = BATCH_SIZE) {
-    BufferedTupleStreamV2 stream(runtime_state_, *desc, &client_, page_len);
+      int64_t default_page_len = PAGE_LEN, int64_t max_page_len = -1,
+      int num_rows = BATCH_SIZE) {
+    if (max_page_len == -1) max_page_len = default_page_len;
+
+    BufferedTupleStreamV2 stream(
+        runtime_state_, *desc, &client_, default_page_len, max_page_len);
     ASSERT_OK(stream.Init(-1, true));
     bool got_write_reservation;
     ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
@@ -339,7 +363,8 @@ class SimpleTupleStreamTest : public testing::Test {
 
   void TestIntValuesInterleaved(int num_batches, int num_batches_before_read,
       bool unpin_stream, int64_t page_len = PAGE_LEN) {
-    BufferedTupleStreamV2 stream(runtime_state_, *int_desc_, &client_, page_len);
+    BufferedTupleStreamV2 stream(
+        runtime_state_, *int_desc_, &client_, page_len, page_len);
     ASSERT_OK(stream.Init(-1, true));
     bool got_reservation;
     ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
@@ -374,6 +399,11 @@ class SimpleTupleStreamTest : public testing::Test {
 
   void TestTransferMemory(bool pinned_stream, bool read_write);
 
+  // Helper to writes 'row' comprised of only string slots to 'data'. The expected
+  // length of the data written is 'expected_len'.
+  void WriteStringRow(const RowDescriptor* row_desc, TupleRow* row, int64_t fixed_size,
+      int64_t varlen_size, uint8_t* data);
+
   // The temporary runtime environment used for the test.
   scoped_ptr<TestEnv> test_env_;
   RuntimeState* runtime_state_;
@@ -387,6 +417,10 @@ class SimpleTupleStreamTest : public testing::Test {
   ObjectPool pool_;
   RowDescriptor* int_desc_;
   RowDescriptor* string_desc_;
+
+  static const int64_t BIG_ROW_BYTES = 16 * 1024;
+  RowDescriptor* big_row_desc_;
+  RowDescriptor* nullable_big_row_desc_;
   scoped_ptr<MemPool> mem_pool_;
 };
 
@@ -503,23 +537,29 @@ class ArrayTupleStreamTest : public SimpleTupleStreamTest {
 // Basic API test. No data should be going to disk.
 TEST_F(SimpleTupleStreamTest, Basic) {
   Init(numeric_limits<int64_t>::max());
+  TestValues<int>(0, int_desc_, false, true);
   TestValues<int>(1, int_desc_, false, true);
   TestValues<int>(10, int_desc_, false, true);
   TestValues<int>(100, int_desc_, false, true);
+  TestValues<int>(0, int_desc_, false, false);
   TestValues<int>(1, int_desc_, false, false);
   TestValues<int>(10, int_desc_, false, false);
   TestValues<int>(100, int_desc_, false, false);
 
+  TestValues<StringValue>(0, string_desc_, false, true);
   TestValues<StringValue>(1, string_desc_, false, true);
   TestValues<StringValue>(10, string_desc_, false, true);
   TestValues<StringValue>(100, string_desc_, false, true);
+  TestValues<StringValue>(0, string_desc_, false, false);
   TestValues<StringValue>(1, string_desc_, false, false);
   TestValues<StringValue>(10, string_desc_, false, false);
   TestValues<StringValue>(100, string_desc_, false, false);
 
+  TestIntValuesInterleaved(0, 1, true);
   TestIntValuesInterleaved(1, 1, true);
   TestIntValuesInterleaved(10, 5, true);
   TestIntValuesInterleaved(100, 15, true);
+  TestIntValuesInterleaved(0, 1, false);
   TestIntValuesInterleaved(1, 1, false);
   TestIntValuesInterleaved(10, 5, false);
   TestIntValuesInterleaved(100, 15, false);
@@ -530,9 +570,11 @@ TEST_F(SimpleTupleStreamTest, OneBufferSpill) {
   // Each buffer can only hold 128 ints, so this spills quite often.
   int buffer_size = 128 * sizeof(int);
   Init(buffer_size);
+  TestValues<int>(0, int_desc_, false, true, buffer_size);
   TestValues<int>(1, int_desc_, false, true, buffer_size);
   TestValues<int>(10, int_desc_, false, true, buffer_size);
 
+  TestValues<StringValue>(0, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(1, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(10, string_desc_, false, true, buffer_size);
 }
@@ -542,13 +584,16 @@ TEST_F(SimpleTupleStreamTest, ManyBufferSpill) {
   int buffer_size = 128 * sizeof(int);
   Init(10 * buffer_size);
 
+  TestValues<int>(0, int_desc_, false, true, buffer_size);
   TestValues<int>(1, int_desc_, false, true, buffer_size);
   TestValues<int>(10, int_desc_, false, true, buffer_size);
   TestValues<int>(100, int_desc_, false, true, buffer_size);
+  TestValues<StringValue>(0, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(1, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(10, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(100, string_desc_, false, true, buffer_size);
 
+  TestIntValuesInterleaved(0, 1, true, buffer_size);
   TestIntValuesInterleaved(1, 1, true, buffer_size);
   TestIntValuesInterleaved(10, 5, true, buffer_size);
   TestIntValuesInterleaved(100, 15, true, buffer_size);
@@ -560,7 +605,8 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
   Init(num_buffers * buffer_size);
   RowDescriptor* row_desc = varlen_data ? string_desc_ : int_desc_;
 
-  BufferedTupleStreamV2 stream(runtime_state_, *row_desc, &client_, buffer_size);
+  BufferedTupleStreamV2 stream(
+      runtime_state_, *row_desc, &client_, buffer_size, buffer_size);
   ASSERT_OK(stream.Init(-1, true));
   if (read_write) {
     bool got_reservation = false;
@@ -654,7 +700,8 @@ void SimpleTupleStreamTest::TestTransferMemory(bool pin_stream, bool read_write)
   int buffer_size = 4 * 1024;
   Init(100 * buffer_size);
 
-  BufferedTupleStreamV2 stream(runtime_state_, *int_desc_, &client_, buffer_size);
+  BufferedTupleStreamV2 stream(
+      runtime_state_, *int_desc_, &client_, buffer_size, buffer_size);
   ASSERT_OK(stream.Init(-1, pin_stream));
   if (read_write) {
     bool got_reservation;
@@ -667,9 +714,9 @@ void SimpleTupleStreamTest::TestTransferMemory(bool pin_stream, bool read_write)
   }
   RowBatch* batch = CreateIntBatch(0, 1024, false);
 
-  // Construct a stream with 4 blocks.
-  const int total_num_buffers = 4;
-  while (stream.byte_size() < total_num_buffers * buffer_size) {
+  // Construct a stream with 4 pages.
+  const int total_num_pages = 4;
+  while (stream.byte_size() < total_num_pages * buffer_size) {
     Status status;
     for (int i = 0; i < batch->num_rows(); ++i) {
       bool ret = stream.AddRow(batch->GetRow(i), &status);
@@ -679,19 +726,26 @@ void SimpleTupleStreamTest::TestTransferMemory(bool pin_stream, bool read_write)
   }
 
   batch->Reset();
+
+  if (read_write) {
+    // Read back batch so that we have a read buffer in memory.
+    bool eos;
+    ASSERT_OK(stream.GetNext(batch, &eos));
+    EXPECT_FALSE(eos);
+  }
   stream.Close(batch, RowBatch::FlushMode::FLUSH_RESOURCES);
   if (pin_stream) {
-    DCHECK_EQ(total_num_buffers, batch->num_buffers());
+    EXPECT_EQ(total_num_pages, batch->num_buffers());
   } else if (read_write) {
-    // Read and write block should be attached.
-    DCHECK_EQ(2, batch->num_buffers());
+    // Read and write buffer should be attached.
+    EXPECT_EQ(2, batch->num_buffers());
   } else {
-    // Read block should be attached.
-    DCHECK_EQ(1, batch->num_buffers());
+    // Read buffer should be attached.
+    EXPECT_EQ(1, batch->num_buffers());
   }
-  DCHECK(batch->AtCapacity()); // Flush resources flag should have been set.
+  EXPECT_TRUE(batch->AtCapacity()); // Flush resources flag should have been set.
   batch->Reset();
-  DCHECK_EQ(0, batch->num_buffers());
+  EXPECT_EQ(0, batch->num_buffers());
 }
 
 /// Test attaching memory to a row batch from a pinned stream.
@@ -730,19 +784,23 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
   }
 
   BufferedTupleStreamV2 stream(
-      runtime_state_, *string_desc_, &client_, buffer_size, external_slots);
+      runtime_state_, *string_desc_, &client_, buffer_size, buffer_size, external_slots);
   ASSERT_OK(stream.Init(0, false));
+  bool got_reservation;
+  ASSERT_OK(stream.PrepareForWrite(&got_reservation));
+  ASSERT_TRUE(got_reservation);
 
   for (int i = 0; i < num_batches; ++i) {
     RowBatch* batch = CreateStringBatch(rows_added, BATCH_SIZE, false);
     for (int j = 0; j < batch->num_rows(); ++j) {
-      uint8_t* varlen_data;
       int fixed_size = tuple_desc.byte_size();
-      uint8_t* tuple = stream.AllocateRow(fixed_size, 0, &varlen_data, &status);
-      ASSERT_TRUE(tuple != nullptr);
-      ASSERT_TRUE(status.ok());
       // Copy fixed portion in, but leave it pointing to row batch's varlen data.
-      memcpy(tuple, batch->GetRow(j)->GetTuple(0), fixed_size);
+      ASSERT_TRUE(stream.AddRowCustom(fixed_size,
+          [batch, fixed_size, j](uint8_t* tuple_data) {
+            memcpy(tuple_data, batch->GetRow(j)->GetTuple(0), fixed_size);
+          },
+          &status));
+      ASSERT_TRUE(status.ok());
     }
     rows_added += batch->num_rows();
   }
@@ -766,49 +824,102 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
 // will be close to the IO block size. With null indicators, stream will fail to
 // be initialized; Without null indicators, things should work fine.
 TEST_F(SimpleTupleStreamTest, BigRow) {
-  Init(2 * PAGE_LEN);
-  vector<TupleId> tuple_ids;
-  vector<bool> nullable_tuples;
-  vector<bool> non_nullable_tuples;
-
-  DescriptorTblBuilder big_row_builder(test_env_->exec_env()->frontend(), &pool_);
-  // Each tuple contains 8 slots of TYPE_INT and a single byte for null indicator.
-  const int num_tuples = PAGE_LEN / (8 * sizeof(int) + 1);
-  for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
-    big_row_builder.DeclareTuple() << TYPE_INT << TYPE_INT << TYPE_INT << TYPE_INT
-                                   << TYPE_INT << TYPE_INT << TYPE_INT << TYPE_INT;
-    tuple_ids.push_back(static_cast<TTupleId>(tuple_idx));
-    nullable_tuples.push_back(true);
-    non_nullable_tuples.push_back(false);
-  }
-  DescriptorTbl* desc = big_row_builder.Build();
+  const int64_t MAX_BUFFERS = 10;
+  Init(MAX_BUFFERS * BIG_ROW_BYTES);
 
-  // Construct a big row with all non-nullable tuples.
-  RowDescriptor* row_desc =
-      pool_.Add(new RowDescriptor(*desc, tuple_ids, non_nullable_tuples));
-  ASSERT_FALSE(row_desc->IsAnyTupleNullable());
   // Test writing this row into the stream and then reading it back.
-  TestValues<int>(1, row_desc, false, false, PAGE_LEN, 1);
-  TestValues<int>(1, row_desc, false, true, PAGE_LEN, 1);
+  // Make sure to exercise the case where the row is larger than the default page.
+  // If the stream is pinned, we can only fit MAX_BUFFERS - 1 rows (since we always
+  // advance to the next page). In the unpinned case we should be able to write
+  // arbitrarily many rows.
+  TestValues<int>(1, big_row_desc_, false, false, BIG_ROW_BYTES, BIG_ROW_BYTES, 1);
+  TestValues<int>(
+      MAX_BUFFERS - 1, big_row_desc_, false, false, BIG_ROW_BYTES, BIG_ROW_BYTES, 1);
+  TestValues<int>(1, big_row_desc_, false, false, BIG_ROW_BYTES / 4, BIG_ROW_BYTES, 1);
+  TestValues<int>(
+      MAX_BUFFERS - 1, big_row_desc_, false, false, BIG_ROW_BYTES / 4, BIG_ROW_BYTES, 1);
+  TestValues<int>(1, big_row_desc_, false, true, BIG_ROW_BYTES, BIG_ROW_BYTES, 1);
+  TestValues<int>(
+      MAX_BUFFERS - 1, big_row_desc_, false, true, BIG_ROW_BYTES, BIG_ROW_BYTES, 1);
+  TestValues<int>(
+      5 * MAX_BUFFERS, big_row_desc_, false, true, BIG_ROW_BYTES, BIG_ROW_BYTES, 1);
+  TestValues<int>(1, big_row_desc_, false, true, BIG_ROW_BYTES / 4, BIG_ROW_BYTES, 1);
+  TestValues<int>(
+      MAX_BUFFERS - 1, big_row_desc_, false, true, BIG_ROW_BYTES / 4, BIG_ROW_BYTES, 1);
+  TestValues<int>(
+      5 * MAX_BUFFERS, big_row_desc_, false, true, BIG_ROW_BYTES / 4, BIG_ROW_BYTES, 1);
+
+  // Test the case where it fits in an in-between page size.
+  TestValues<int>(MAX_BUFFERS - 1, big_row_desc_, false, false, BIG_ROW_BYTES / 4,
+      BIG_ROW_BYTES * 2, 1);
+  TestValues<int>(MAX_BUFFERS - 1, big_row_desc_, false, true, BIG_ROW_BYTES / 4,
+      BIG_ROW_BYTES * 2, 1);
 
   // Construct a big row with nullable tuples. This requires extra space for null
   // indicators in the stream so adding the row will fail.
-  RowDescriptor* nullable_row_desc =
-      pool_.Add(new RowDescriptor(*desc, tuple_ids, nullable_tuples));
-  ASSERT_TRUE(nullable_row_desc->IsAnyTupleNullable());
+  ASSERT_TRUE(nullable_big_row_desc_->IsAnyTupleNullable());
   BufferedTupleStreamV2 nullable_stream(
-      runtime_state_, *nullable_row_desc, &client_, PAGE_LEN);
+      runtime_state_, *nullable_big_row_desc_, &client_, BIG_ROW_BYTES, BIG_ROW_BYTES);
   ASSERT_OK(nullable_stream.Init(-1, true));
   bool got_reservation;
-  Status status = nullable_stream.PrepareForWrite(&got_reservation);
-  EXPECT_EQ(TErrorCode::BTS_BLOCK_OVERFLOW, status.code());
+  ASSERT_OK(nullable_stream.PrepareForWrite(&got_reservation));
+
+  // With null tuples, a row can fit in the stream.
+  RowBatch* batch = CreateBatch(*nullable_big_row_desc_, 0, 1, true);
+  Status status;
+  EXPECT_TRUE(nullable_stream.AddRow(batch->GetRow(0), &status));
+  // With the additional null indicator, we can't fit all the tuples of a row into
+  // the stream.
+  batch = CreateBatch(*nullable_big_row_desc_, 0, 1, false);
+  EXPECT_FALSE(nullable_stream.AddRow(batch->GetRow(0), &status));
+  EXPECT_EQ(TErrorCode::MAX_ROW_SIZE, status.code());
   nullable_stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
+// Test the memory use for large rows.
+TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) {
+  const int64_t MAX_BUFFERS = 10;
+  const int64_t DEFAULT_PAGE_LEN = BIG_ROW_BYTES / 4;
+  Init(MAX_BUFFERS * BIG_ROW_BYTES);
+  Status status;
+  BufferedTupleStreamV2 stream(
+      runtime_state_, *big_row_desc_, &client_, DEFAULT_PAGE_LEN, BIG_ROW_BYTES * 2);
+  ASSERT_OK(stream.Init(-1, true));
+  RowBatch* batch;
+  bool got_reservation;
+  ASSERT_OK(stream.PrepareForWrite(&got_reservation));
+  ASSERT_TRUE(got_reservation);
+  // We should be able to append MAX_BUFFERS without problem.
+  for (int i = 0; i < MAX_BUFFERS; ++i) {
+    batch = CreateBatch(*big_row_desc_, i, 1, false);
+    bool success = stream.AddRow(batch->GetRow(0), &status);
+    ASSERT_TRUE(success);
+    // We should have one large page per row.
+    EXPECT_EQ(BIG_ROW_BYTES * (i + 1), client_.GetUsedReservation())
+        << i << ": " << client_.DebugString();
+  }
+
+  // We can't fit another row in memory - need to unpin to make progress.
+  batch = CreateBatch(*big_row_desc_, MAX_BUFFERS, 1, false);
+  bool success = stream.AddRow(batch->GetRow(0), &status);
+  ASSERT_FALSE(success);
+  ASSERT_OK(status);
+  stream.UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+  success = stream.AddRow(batch->GetRow(0), &status);
+  ASSERT_TRUE(success);
+  // Read all the rows back and verify.
+  ASSERT_OK(stream.PrepareForRead(false, &got_reservation));
+  ASSERT_TRUE(got_reservation);
+  vector<int> results;
+  ReadValues(&stream, big_row_desc_, &results);
+  VerifyResults<int>(*big_row_desc_, results, MAX_BUFFERS + 1, false);
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+
 // Test for IMPALA-3923: overflow of 32-bit int in GetRows().
 TEST_F(SimpleTupleStreamTest, TestGetRowsOverflow) {
   Init(BUFFER_POOL_LIMIT);
-  BufferedTupleStreamV2 stream(runtime_state_, *int_desc_, &client_, PAGE_LEN);
+  BufferedTupleStreamV2 stream(runtime_state_, *int_desc_, &client_, PAGE_LEN, PAGE_LEN);
   ASSERT_OK(stream.Init(-1, true));
 
   Status status;
@@ -822,38 +933,132 @@ TEST_F(SimpleTupleStreamTest, TestGetRowsOverflow) {
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
+// Test rows greater than the default page size. Also exercise the read/write
+// mode with large pages.
+TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
+  const int64_t MAX_BUFFERS = 10;
+  const int64_t DEFAULT_PAGE_LEN = BIG_ROW_BYTES / 4;
+  Init(MAX_BUFFERS * BIG_ROW_BYTES);
+  Status status;
+  BufferedTupleStreamV2 stream(
+      runtime_state_, *string_desc_, &client_, DEFAULT_PAGE_LEN, BIG_ROW_BYTES * 2);
+  ASSERT_OK(stream.Init(-1, true));
+  RowBatch write_batch(*string_desc_, 1024, &tracker_);
+  RowBatch read_batch(*string_desc_, 1024, &tracker_);
+  bool got_reservation;
+  ASSERT_OK(stream.PrepareForReadWrite(false, &got_reservation));
+  ASSERT_TRUE(got_reservation);
+  TupleRow* write_row = write_batch.GetRow(0);
+  TupleDescriptor* tuple_desc = string_desc_->tuple_descriptors()[0];
+  vector<uint8_t> tuple_mem(tuple_desc->byte_size());
+  Tuple* write_tuple = reinterpret_cast<Tuple*>(tuple_mem.data());
+  write_row->SetTuple(0, write_tuple);
+  StringValue* write_str = reinterpret_cast<StringValue*>(
+      write_tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
+  // Make the string large enough to fill a page.
+  const int64_t string_len = BIG_ROW_BYTES - tuple_desc->byte_size();
+  vector<char> data(string_len);
+  write_str->len = string_len;
+  write_str->ptr = data.data();
+
+  // We should be able to append MAX_BUFFERS without problem.
+  for (int i = 0; i < MAX_BUFFERS; ++i) {
+    // Fill the string with the value i.
+    memset(write_str->ptr, i, write_str->len);
+    bool success = stream.AddRow(write_row, &status);
+    ASSERT_TRUE(success);
+    // We should have one large page per row, plus a default-size read/write page, plus
+    // we waste the first default-size page in the stream by leaving it empty.
+    EXPECT_EQ(BIG_ROW_BYTES * (i + 1), client_.GetUsedReservation())
+        << i << ": " << client_.DebugString() << "\n"
+        << stream.DebugString();
+
+    // Read back the rows as we write them to test read/write mode.
+    read_batch.Reset();
+    bool eos;
+    ASSERT_OK(stream.GetNext(&read_batch, &eos));
+    EXPECT_EQ(1, read_batch.num_rows());
+    EXPECT_TRUE(eos);
+    Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
+    StringValue* str = reinterpret_cast<StringValue*>(
+        tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
+    EXPECT_EQ(string_len, str->len);
+    for (int j = 0; j < string_len; ++j) {
+      EXPECT_EQ(i, str->ptr[j]) << j;
+    }
+  }
+
+  // We can't fit another row in memory - need to unpin to make progress.
+  memset(write_str->ptr, MAX_BUFFERS, write_str->len);
+  bool success = stream.AddRow(write_row, &status);
+  ASSERT_FALSE(success);
+  ASSERT_OK(status);
+  stream.UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+  success = stream.AddRow(write_row, &status);
+  ASSERT_TRUE(success);
+
+  // Read all the rows back and verify.
+  ASSERT_OK(stream.PrepareForRead(false, &got_reservation));
+  ASSERT_TRUE(got_reservation);
+  for (int i = 0; i < MAX_BUFFERS + 1; ++i) {
+    read_batch.Reset();
+    bool eos;
+    ASSERT_OK(stream.GetNext(&read_batch, &eos));
+    EXPECT_EQ(1, read_batch.num_rows());
+    EXPECT_EQ(eos, i == MAX_BUFFERS) << i;
+    Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
+    StringValue* str = reinterpret_cast<StringValue*>(
+        tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
+    EXPECT_EQ(string_len, str->len);
+    for (int j = 0; j < string_len; ++j) {
+      ASSERT_EQ(i, str->ptr[j]) << j;
+    }
+  }
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+
 // Basic API test. No data should be going to disk.
 TEST_F(SimpleNullStreamTest, Basic) {
   Init(BUFFER_POOL_LIMIT);
+  TestValues<int>(0, int_desc_, false, true);
   TestValues<int>(1, int_desc_, false, true);
   TestValues<int>(10, int_desc_, false, true);
   TestValues<int>(100, int_desc_, false, true);
+  TestValues<int>(0, int_desc_, true, true);
   TestValues<int>(1, int_desc_, true, true);
   TestValues<int>(10, int_desc_, true, true);
   TestValues<int>(100, int_desc_, true, true);
+  TestValues<int>(0, int_desc_, false, false);
   TestValues<int>(1, int_desc_, false, false);
   TestValues<int>(10, int_desc_, false, false);
   TestValues<int>(100, int_desc_, false, false);
+  TestValues<int>(0, int_desc_, true, false);
   TestValues<int>(1, int_desc_, true, false);
   TestValues<int>(10, int_desc_, true, false);
   TestValues<int>(100, int_desc_, true, false);
 
+  TestValues<StringValue>(0, string_desc_, false, true);
   TestValues<StringValue>(1, string_desc_, false, true);
   TestValues<StringValue>(10, string_desc_, false, true);
   TestValues<StringValue>(100, string_desc_, false, true);
+  TestValues<StringValue>(0, string_desc_, true, true);
   TestValues<StringValue>(1, string_desc_, true, true);
   TestValues<StringValue>(10, string_desc_, true, true);
   TestValues<StringValue>(100, string_desc_, true, true);
+  TestValues<StringValue>(0, string_desc_, false, false);
   TestValues<StringValue>(1, string_desc_, false, false);
   TestValues<StringValue>(10, string_desc_, false, false);
   TestValues<StringValue>(100, string_desc_, false, false);
+  TestValues<StringValue>(0, string_desc_, true, false);
   TestValues<StringValue>(1, string_desc_, true, false);
   TestValues<StringValue>(10, string_desc_, true, false);
   TestValues<StringValue>(100, string_desc_, true, false);
 
+  TestIntValuesInterleaved(0, 1, true);
   TestIntValuesInterleaved(1, 1, true);
   TestIntValuesInterleaved(10, 5, true);
   TestIntValuesInterleaved(100, 15, true);
+  TestIntValuesInterleaved(0, 1, false);
   TestIntValuesInterleaved(1, 1, false);
   TestIntValuesInterleaved(10, 5, false);
   TestIntValuesInterleaved(100, 15, false);
@@ -864,9 +1069,11 @@ TEST_F(MultiTupleStreamTest, MultiTupleOneBufferSpill) {
   // Each buffer can only hold 128 ints, so this spills quite often.
   int buffer_size = 128 * sizeof(int);
   Init(buffer_size);
+  TestValues<int>(0, int_desc_, false, true, buffer_size);
   TestValues<int>(1, int_desc_, false, true, buffer_size);
   TestValues<int>(10, int_desc_, false, true, buffer_size);
 
+  TestValues<StringValue>(0, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(1, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(10, string_desc_, false, true, buffer_size);
 }
@@ -876,10 +1083,12 @@ TEST_F(MultiTupleStreamTest, MultiTupleManyBufferSpill) {
   int buffer_size = 128 * sizeof(int);
   Init(10 * buffer_size);
 
+  TestValues<int>(0, int_desc_, false, true, buffer_size);
   TestValues<int>(1, int_desc_, false, true, buffer_size);
   TestValues<int>(10, int_desc_, false, true, buffer_size);
   TestValues<int>(100, int_desc_, false, true, buffer_size);
 
+  TestValues<StringValue>(0, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(1, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(10, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(100, string_desc_, false, true, buffer_size);
@@ -891,7 +1100,7 @@ TEST_F(MultiTupleStreamTest, MultiTupleManyBufferSpill) {
 
 // Test that we can allocate a row in the stream and copy in multiple tuples then
 // read it back from the stream.
-TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) {
+TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) {
   // Use small buffers so it will be flushed to disk.
   int buffer_size = 4 * 1024;
   Init(2 * buffer_size);
@@ -899,7 +1108,8 @@ TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) {
 
   int num_batches = 1;
   int rows_added = 0;
-  BufferedTupleStreamV2 stream(runtime_state_, *string_desc_, &client_, buffer_size);
+  BufferedTupleStreamV2 stream(
+      runtime_state_, *string_desc_, &client_, buffer_size, buffer_size);
   ASSERT_OK(stream.Init(-1, false));
   bool got_write_reservation;
   ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
@@ -916,28 +1126,12 @@ TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) {
         fixed_size += tuple_desc->byte_size();
         varlen_size += row->GetTuple(k)->VarlenByteSize(*tuple_desc);
       }
-      uint8_t* varlen_data;
-      uint8_t* fixed_data =
-          stream.AllocateRow(fixed_size, varlen_size, &varlen_data, &status);
-      ASSERT_TRUE(fixed_data != nullptr);
+      ASSERT_TRUE(stream.AddRowCustom(fixed_size + varlen_size,
+          [this, row, fixed_size, varlen_size](uint8_t* data) {
+            WriteStringRow(string_desc_, row, fixed_size, varlen_size, data);
+          },
+          &status));
       ASSERT_TRUE(status.ok());
-      uint8_t* varlen_write_ptr = varlen_data;
-      for (int k = 0; k < string_desc_->tuple_descriptors().size(); k++) {
-        TupleDescriptor* tuple_desc = string_desc_->tuple_descriptors()[k];
-        Tuple* src = row->GetTuple(k);
-        Tuple* dst = reinterpret_cast<Tuple*>(fixed_data);
-        fixed_data += tuple_desc->byte_size();
-        memcpy(dst, src, tuple_desc->byte_size());
-        for (int l = 0; l < tuple_desc->slots().size(); l++) {
-          SlotDescriptor* slot = tuple_desc->slots()[l];
-          StringValue* src_string = src->GetStringSlot(slot->tuple_offset());
-          StringValue* dst_string = dst->GetStringSlot(slot->tuple_offset());
-          dst_string->ptr = reinterpret_cast<char*>(varlen_write_ptr);
-          memcpy(dst_string->ptr, src_string->ptr, src_string->len);
-          varlen_write_ptr += src_string->len;
-        }
-      }
-      ASSERT_EQ(varlen_data + varlen_size, varlen_write_ptr);
     }
     rows_added += batch->num_rows();
   }
@@ -955,18 +1149,43 @@ TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) {
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
+void SimpleTupleStreamTest::WriteStringRow(const RowDescriptor* row_desc, TupleRow* row,
+    int64_t fixed_size, int64_t varlen_size, uint8_t* data) {
+  uint8_t* fixed_data = data;
+  uint8_t* varlen_write_ptr = data + fixed_size;
+  for (int i = 0; i < row_desc->tuple_descriptors().size(); i++) {
+    TupleDescriptor* tuple_desc = row_desc->tuple_descriptors()[i];
+    Tuple* src = row->GetTuple(i);
+    Tuple* dst = reinterpret_cast<Tuple*>(fixed_data);
+    fixed_data += tuple_desc->byte_size();
+    memcpy(dst, src, tuple_desc->byte_size());
+    for (SlotDescriptor* slot : tuple_desc->slots()) {
+      StringValue* src_string = src->GetStringSlot(slot->tuple_offset());
+      StringValue* dst_string = dst->GetStringSlot(slot->tuple_offset());
+      dst_string->ptr = reinterpret_cast<char*>(varlen_write_ptr);
+      memcpy(dst_string->ptr, src_string->ptr, src_string->len);
+      varlen_write_ptr += src_string->len;
+    }
+  }
+  ASSERT_EQ(data + fixed_size + varlen_size, varlen_write_ptr);
+}
+
 // Test with rows with multiple nullable tuples.
 TEST_F(MultiNullableTupleStreamTest, MultiNullableTupleOneBufferSpill) {
   // Each buffer can only hold 128 ints, so this spills quite often.
   int buffer_size = 128 * sizeof(int);
   Init(buffer_size);
+  TestValues<int>(0, int_desc_, false, true, buffer_size);
   TestValues<int>(1, int_desc_, false, true, buffer_size);
   TestValues<int>(10, int_desc_, false, true, buffer_size);
+  TestValues<int>(0, int_desc_, true, true, buffer_size);
   TestValues<int>(1, int_desc_, true, true, buffer_size);
   TestValues<int>(10, int_desc_, true, true, buffer_size);
 
+  TestValues<StringValue>(0, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(1, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(10, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(0, string_desc_, true, true, buffer_size);
   TestValues<StringValue>(1, string_desc_, true, true, buffer_size);
   TestValues<StringValue>(10, string_desc_, true, true, buffer_size);
 }
@@ -976,20 +1195,25 @@ TEST_F(MultiNullableTupleStreamTest, MultiNullableTupleManyBufferSpill) {
   int buffer_size = 128 * sizeof(int);
   Init(10 * buffer_size);
 
+  TestValues<int>(0, int_desc_, false, true, buffer_size);
   TestValues<int>(1, int_desc_, false, true, buffer_size);
   TestValues<int>(10, int_desc_, false, true, buffer_size);
   TestValues<int>(100, int_desc_, false, true, buffer_size);
+  TestValues<int>(0, int_desc_, true, true, buffer_size);
   TestValues<int>(1, int_desc_, true, true, buffer_size);
   TestValues<int>(10, int_desc_, true, true, buffer_size);
   TestValues<int>(100, int_desc_, true, true, buffer_size);
 
+  TestValues<StringValue>(0, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(1, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(10, string_desc_, false, true, buffer_size);
   TestValues<StringValue>(100, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(0, string_desc_, true, true, buffer_size);
   TestValues<StringValue>(1, string_desc_, true, true, buffer_size);
   TestValues<StringValue>(10, string_desc_, true, true, buffer_size);
   TestValues<StringValue>(100, string_desc_, true, true, buffer_size);
 
+  TestIntValuesInterleaved(0, 1, true, buffer_size);
   TestIntValuesInterleaved(1, 1, true, buffer_size);
   TestIntValuesInterleaved(10, 5, true, buffer_size);
   TestIntValuesInterleaved(100, 15, true, buffer_size);
@@ -1005,7 +1229,7 @@ TEST_F(MultiNullableTupleStreamTest, TestComputeRowSize) {
   external_slots.insert(external_string_slot->id());
 
   BufferedTupleStreamV2 stream(
-      runtime_state_, *string_desc_, &client_, PAGE_LEN, external_slots);
+      runtime_state_, *string_desc_, &client_, PAGE_LEN, PAGE_LEN, external_slots);
   gscoped_ptr<TupleRow, FreeDeleter> row(
       reinterpret_cast<TupleRow*>(malloc(tuple_descs.size() * sizeof(Tuple*))));
   gscoped_ptr<Tuple, FreeDeleter> tuple0(
@@ -1055,7 +1279,8 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
   Status status;
   Init(BUFFER_POOL_LIMIT);
   const int NUM_ROWS = 4000;
-  BufferedTupleStreamV2 stream(runtime_state_, *array_desc_, &client_, PAGE_LEN);
+  BufferedTupleStreamV2 stream(
+      runtime_state_, *array_desc_, &client_, PAGE_LEN, PAGE_LEN);
   const vector<TupleDescriptor*>& tuple_descs = array_desc_->tuple_descriptors();
   // Write out a predictable pattern of data by iterating over arrays of constants.
   int strings_index = 0; // we take the mod of this as index into STRINGS.
@@ -1169,7 +1394,7 @@ TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
   external_slots.insert(external_array_slot->id());
 
   BufferedTupleStreamV2 stream(
-      runtime_state_, *array_desc_, &client_, PAGE_LEN, external_slots);
+      runtime_state_, *array_desc_, &client_, PAGE_LEN, PAGE_LEN, external_slots);
   gscoped_ptr<TupleRow, FreeDeleter> row(
       reinterpret_cast<TupleRow*>(malloc(tuple_descs.size() * sizeof(Tuple*))));
   gscoped_ptr<Tuple, FreeDeleter> tuple0(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/buffered-tuple-stream-v2.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc b/be/src/runtime/buffered-tuple-stream-v2.cc
index c36c31f..424678b 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2.cc
@@ -48,33 +48,40 @@ using BufferHandle = BufferPool::BufferHandle;
 
 BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state,
     const RowDescriptor& row_desc, BufferPool::ClientHandle* buffer_pool_client,
-    int64_t page_len, const set<SlotId>& ext_varlen_slots)
+    int64_t default_page_len, int64_t max_page_len, const set<SlotId>& ext_varlen_slots)
   : state_(state),
     desc_(row_desc),
+    node_id_(-1),
     buffer_pool_(state->exec_env()->buffer_pool()),
     buffer_pool_client_(buffer_pool_client),
     total_byte_size_(0),
+    has_read_iterator_(false),
+    read_page_reservation_(buffer_pool_client_),
     read_page_rows_returned_(-1),
     read_ptr_(nullptr),
     read_end_ptr_(nullptr),
     write_ptr_(nullptr),
     write_end_ptr_(nullptr),
     rows_returned_(0),
+    has_write_iterator_(false),
     write_page_(nullptr),
+    write_page_reservation_(buffer_pool_client_),
     bytes_pinned_(0),
     num_rows_(0),
-    page_len_(page_len),
+    default_page_len_(default_page_len),
+    max_page_len_(max_page_len),
     has_nullable_tuple_(row_desc.IsAnyTupleNullable()),
     delete_on_read_(false),
     closed_(false),
     pinned_(true) {
+  DCHECK_GE(max_page_len, default_page_len);
+  DCHECK(BitUtil::IsPowerOf2(default_page_len)) << default_page_len;
+  DCHECK(BitUtil::IsPowerOf2(max_page_len)) << max_page_len;
   read_page_ = pages_.end();
-  fixed_tuple_row_size_ = 0;
   for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) {
     const TupleDescriptor* tuple_desc = desc_.tuple_descriptors()[i];
     const int tuple_byte_size = tuple_desc->byte_size();
     fixed_tuple_sizes_.push_back(tuple_byte_size);
-    fixed_tuple_row_size_ += tuple_byte_size;
 
     vector<SlotDescriptor*> tuple_string_slots;
     vector<SlotDescriptor*> tuple_coll_slots;
@@ -98,7 +105,6 @@ BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state,
       inlined_coll_slots_.push_back(make_pair(i, tuple_coll_slots));
     }
   }
-  if (has_nullable_tuple_) fixed_tuple_row_size_ += NullIndicatorBytesPerRow();
 }
 
 BufferedTupleStreamV2::~BufferedTupleStreamV2() {
@@ -109,8 +115,13 @@ void BufferedTupleStreamV2::CheckConsistency() const {
   DCHECK_EQ(bytes_pinned_, CalcBytesPinned()) << DebugString();
   for (const Page& page : pages_) {
     DCHECK_EQ(ExpectedPinCount(pinned_, &page), page.pin_count()) << DebugString();
+    // Only one large row per page.
+    if (page.len() > default_page_len_) DCHECK_LE(page.num_rows, 1);
+    // We only create pages when we have a row to append to them.
+    DCHECK_GT(page.num_rows, 0);
   }
-  if (has_write_iterator()) {
+  DCHECK(has_write_iterator() || write_page_ == nullptr);
+  if (write_page_ != nullptr) {
     DCHECK(write_page_->is_pinned());
     DCHECK(write_page_->retrieved_buffer);
     const BufferHandle* write_buffer;
@@ -120,28 +131,54 @@ void BufferedTupleStreamV2::CheckConsistency() const {
     DCHECK_EQ(write_end_ptr_, write_buffer->data() + write_page_->len());
     DCHECK_GE(write_end_ptr_, write_ptr_);
   }
-  if (has_read_iterator()) {
+  DCHECK(has_read_iterator() || read_page_ == pages_.end());
+  if (read_page_ != pages_.end()) {
     DCHECK(read_page_->is_pinned());
     DCHECK(read_page_->retrieved_buffer);
     // Can't check read buffer without affecting behaviour, because a read may be in
     // flight and this would required blocking on that write.
     DCHECK_GE(read_end_ptr_, read_ptr_);
   }
+  if (NeedReadReservation()) {
+    DCHECK_EQ(default_page_len_, read_page_reservation_.GetReservation())
+        << DebugString();
+  } else if (!read_page_reservation_.is_closed()) {
+    DCHECK_EQ(0, read_page_reservation_.GetReservation());
+  }
+  if (NeedWriteReservation()) {
+    DCHECK_EQ(default_page_len_, write_page_reservation_.GetReservation());
+  } else if (!write_page_reservation_.is_closed()) {
+    DCHECK_EQ(0, write_page_reservation_.GetReservation());
+  }
 }
 
 string BufferedTupleStreamV2::DebugString() const {
   stringstream ss;
   ss << "BufferedTupleStreamV2 num_rows=" << num_rows_
      << " rows_returned=" << rows_returned_ << " pinned=" << pinned_
-     << " delete_on_read=" << delete_on_read_ << " closed=" << closed_
-     << " bytes_pinned=" << bytes_pinned_ << " write_page=" << write_page_
+     << " delete_on_read=" << delete_on_read_ << " closed=" << closed_ << "\n"
+     << " bytes_pinned=" << bytes_pinned_ << " has_write_iterator=" << has_write_iterator_
+     << " write_page=" << write_page_ << " has_read_iterator=" << has_read_iterator_
      << " read_page=";
-  if (!has_read_iterator()) {
+  if (read_page_ == pages_.end()) {
     ss << "<end>";
   } else {
     ss << &*read_page_;
   }
-  ss << " pages=[\n";
+  ss << "\n"
+     << " read_page_reservation=";
+  if (read_page_reservation_.is_closed()) {
+    ss << "<closed>";
+  } else {
+    ss << read_page_reservation_.GetReservation();
+  }
+  ss << " write_page_reservation=";
+  if (write_page_reservation_.is_closed()) {
+    ss << "<closed>";
+  } else {
+    ss << write_page_reservation_.GetReservation();
+  }
+  ss << "\n # pages=" << pages_.size() << " pages=[\n";
   for (const Page& page : pages_) {
     ss << "{" << page.DebugString() << "}";
     if (&page != &pages_.back()) ss << ",\n";
@@ -156,6 +193,7 @@ string BufferedTupleStreamV2::Page::DebugString() const {
 
 Status BufferedTupleStreamV2::Init(int node_id, bool pinned) {
   if (!pinned) UnpinStream(UNPIN_ALL_EXCEPT_CURRENT);
+  node_id_ = node_id;
   return Status::OK();
 }
 
@@ -167,10 +205,11 @@ Status BufferedTupleStreamV2::PrepareForWrite(bool* got_reservation) {
   DCHECK(!has_read_iterator());
   CHECK_CONSISTENCY();
 
-  RETURN_IF_ERROR(CheckPageSizeForRow(fixed_tuple_row_size_));
-  *got_reservation = buffer_pool_client_->IncreaseReservationToFit(page_len_);
+  *got_reservation = buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
   if (!*got_reservation) return Status::OK();
-  RETURN_IF_ERROR(NewWritePage());
+  has_write_iterator_ = true;
+  // Save reservation for the write iterators.
+  buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
   CHECK_CONSISTENCY();
   return Status::OK();
 }
@@ -184,10 +223,12 @@ Status BufferedTupleStreamV2::PrepareForReadWrite(
   DCHECK(!has_read_iterator());
   CHECK_CONSISTENCY();
 
-  RETURN_IF_ERROR(CheckPageSizeForRow(fixed_tuple_row_size_));
-  *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * page_len_);
+  *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * default_page_len_);
   if (!*got_reservation) return Status::OK();
-  RETURN_IF_ERROR(NewWritePage());
+  has_write_iterator_ = true;
+  // Save reservation for both the read and write iterators.
+  buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_);
+  buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
   RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read));
   return Status::OK();
 }
@@ -206,6 +247,8 @@ void BufferedTupleStreamV2::Close(RowBatch* batch, RowBatch::FlushMode flush) {
       buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle);
     }
   }
+  read_page_reservation_.Close();
+  write_page_reservation_.Close();
   pages_.clear();
   bytes_pinned_ = 0;
   closed_ = true;
@@ -224,24 +267,7 @@ Status BufferedTupleStreamV2::PinPage(Page* page) {
 }
 
 int BufferedTupleStreamV2::ExpectedPinCount(bool stream_pinned, const Page* page) const {
-  int pin_count = 0;
-  if (stream_pinned && has_write_iterator() && has_read_iterator()) {
-    // The stream is pinned, so all pages have a pin for that (and this pin will be used
-    // as the read iterator when the stream is unpinned)
-    pin_count++;
-    // The write iterator gets it's own pin so that we can unpin the stream without
-    // needing additional reservation.
-    if (is_write_page(page)) pin_count++;
-  } else if (stream_pinned) {
-    // The stream is pinned and only has one iterator. When it's unpinned, either the read
-    // or write iterator can use this pin count.
-    pin_count++;
-  } else {
-    // The stream is unpinned. Each iterator gets a pin count.
-    if (is_read_page(page)) pin_count++;
-    if (is_write_page(page)) pin_count++;
-  }
-  return pin_count;
+  return (stream_pinned || is_read_page(page) || is_write_page(page)) ? 1 : 0;
 }
 
 Status BufferedTupleStreamV2::PinPageIfNeeded(Page* page, bool stream_pinned) {
@@ -263,66 +289,145 @@ void BufferedTupleStreamV2::UnpinPageIfNeeded(Page* page, bool stream_pinned) {
   }
 }
 
-Status BufferedTupleStreamV2::NewWritePage() noexcept {
+bool BufferedTupleStreamV2::NeedWriteReservation() const {
+  return NeedWriteReservation(pinned_);
+}
+
+bool BufferedTupleStreamV2::NeedWriteReservation(bool stream_pinned) const {
+  return NeedWriteReservation(stream_pinned, pages_.size(), has_write_iterator(),
+      write_page_ != nullptr, has_read_write_page());
+}
+
+bool BufferedTupleStreamV2::NeedWriteReservation(bool stream_pinned, int64_t num_pages,
+    bool has_write_iterator, bool has_write_page, bool has_read_write_page) {
+  if (!has_write_iterator) return false;
+  // If the stream is empty the write reservation hasn't been used yet.
+  if (num_pages == 0) return true;
+  if (stream_pinned) {
+    // Make sure we've saved the write reservation for the next page if the only
+    // page is a read/write page.
+    return has_read_write_page && num_pages == 1;
+  } else {
+    // Make sure we've saved the write reservation if it's not being used to pin
+    // a page in the stream.
+    return !has_write_page || has_read_write_page;
+  }
+}
+
+bool BufferedTupleStreamV2::NeedReadReservation() const {
+  return NeedReadReservation(pinned_);
+}
+
+bool BufferedTupleStreamV2::NeedReadReservation(bool stream_pinned) const {
+  return NeedReadReservation(
+      stream_pinned, pages_.size(), has_read_iterator(), read_page_ != pages_.end());
+}
+
+bool BufferedTupleStreamV2::NeedReadReservation(bool stream_pinned, int64_t num_pages,
+    bool has_read_iterator, bool has_read_page) const {
+  return NeedReadReservation(stream_pinned, num_pages, has_read_iterator, has_read_page,
+      has_write_iterator(), write_page_ != nullptr);
+}
+
+bool BufferedTupleStreamV2::NeedReadReservation(bool stream_pinned, int64_t num_pages,
+    bool has_read_iterator, bool has_read_page, bool has_write_iterator,
+    bool has_write_page) {
+  if (!has_read_iterator) return false;
+  if (stream_pinned) {
+    // Need reservation if there are no pages currently pinned for reading but we may add
+    // a page.
+    return num_pages == 0 && has_write_iterator;
+  } else {
+    // Only need to save reservation for an unpinned stream if there is no read page
+    // and we may advance to one in the future.
+    return (has_write_iterator || num_pages > 0) && !has_read_page;
+  }
+}
+
+Status BufferedTupleStreamV2::NewWritePage(int64_t page_len) noexcept {
   DCHECK(!closed_);
-  DCHECK(!has_write_iterator());
+  DCHECK(write_page_ == nullptr);
 
   Page new_page;
   const BufferHandle* write_buffer;
   RETURN_IF_ERROR(buffer_pool_->CreatePage(
-      buffer_pool_client_, page_len_, &new_page.handle, &write_buffer));
-  bytes_pinned_ += page_len_;
-  total_byte_size_ += page_len_;
+      buffer_pool_client_, page_len, &new_page.handle, &write_buffer));
+  bytes_pinned_ += page_len;
+  total_byte_size_ += page_len;
 
   pages_.push_back(std::move(new_page));
   write_page_ = &pages_.back();
   DCHECK_EQ(write_page_->num_rows, 0);
   write_ptr_ = write_buffer->data();
-  write_end_ptr_ = write_ptr_ + page_len_;
+  write_end_ptr_ = write_ptr_ + page_len;
   return Status::OK();
 }
 
-Status BufferedTupleStreamV2::CheckPageSizeForRow(int64_t row_size) {
-  // TODO: IMPALA-3208: need to rework this logic to support large pages - should pick
-  // next power-of-two size.
-  if (UNLIKELY(row_size > page_len_)) {
-    // TODO: IMPALA-3208: change the message to reference the query option controlling
-    // max row size.
-    return Status(TErrorCode::BTS_BLOCK_OVERFLOW,
-        PrettyPrinter::Print(row_size, TUnit::BYTES),
+Status BufferedTupleStreamV2::CalcPageLenForRow(int64_t row_size, int64_t* page_len) {
+  if (UNLIKELY(row_size > max_page_len_)) {
+    return Status(TErrorCode::MAX_ROW_SIZE,
+        PrettyPrinter::Print(row_size, TUnit::BYTES), node_id_,
         PrettyPrinter::Print(0, TUnit::BYTES));
   }
+  *page_len = max(default_page_len_, BitUtil::RoundUpToPowerOfTwo(row_size));
   return Status::OK();
 }
 
 Status BufferedTupleStreamV2::AdvanceWritePage(
     int64_t row_size, bool* got_reservation) noexcept {
+  DCHECK(has_write_iterator());
   CHECK_CONSISTENCY();
 
-  // Get ready to move to the next write page by unsetting 'write_page_' and
-  // potentially (depending on the mode of this stream) freeing up reservation for the
-  // next write page.
-  ResetWritePage();
+  int64_t page_len;
+  RETURN_IF_ERROR(CalcPageLenForRow(row_size, &page_len));
 
-  RETURN_IF_ERROR(CheckPageSizeForRow(row_size));
-  // May need to pin the new page for both reading and writing. See ExpectedPinCount();
-  bool pin_for_read = has_read_iterator() && pinned_;
-  int64_t new_page_reservation = pin_for_read ? 2 * page_len_ : page_len_;
-  if (!buffer_pool_client_->IncreaseReservationToFit(new_page_reservation)) {
+  // Reservation may have been saved for the next write page, e.g. by PrepareForWrite()
+  // if the stream is empty.
+  int64_t write_reservation_to_restore = 0, read_reservation_to_restore = 0;
+  if (NeedWriteReservation(
+          pinned_, pages_.size(), true, write_page_ != nullptr, has_read_write_page())
+      && !NeedWriteReservation(pinned_, pages_.size() + 1, true, true, false)) {
+    write_reservation_to_restore = default_page_len_;
+  }
+  // If the stream is pinned, we need to keep the previous write page pinned for reading.
+  // Check if we saved reservation for this case.
+  if (NeedReadReservation(pinned_, pages_.size(), has_read_iterator(),
+          read_page_ != pages_.end(), true, write_page_ != nullptr)
+      && !NeedReadReservation(pinned_, pages_.size() + 1, has_read_iterator(),
+             read_page_ != pages_.end(), true, true)) {
+    read_reservation_to_restore = default_page_len_;
+  }
+
+  // We may reclaim reservation by unpinning a page that was pinned for writing.
+  int64_t write_page_reservation_to_reclaim =
+      (write_page_ != nullptr && !pinned_ && !has_read_write_page()) ?
+      write_page_->len() : 0;
+  // Check to see if we can get the reservation before changing the state of the stream.
+  if (!buffer_pool_client_->IncreaseReservationToFit(page_len
+          - write_reservation_to_restore - read_reservation_to_restore
+          - write_page_reservation_to_reclaim)) {
+    DCHECK(pinned_ || page_len > default_page_len_)
+        << "If the stream is unpinned, this should only fail for large pages";
+    CHECK_CONSISTENCY();
     *got_reservation = false;
     return Status::OK();
   }
-  RETURN_IF_ERROR(NewWritePage());
-  // We may need to pin the page for reading also.
-  if (pin_for_read) RETURN_IF_ERROR(PinPage(write_page_));
-
-  CHECK_CONSISTENCY();
+  if (write_reservation_to_restore > 0) {
+    buffer_pool_client_->RestoreReservation(
+        &write_page_reservation_, write_reservation_to_restore);
+  }
+  if (read_reservation_to_restore > 0) {
+    buffer_pool_client_->RestoreReservation(
+        &read_page_reservation_, read_reservation_to_restore);
+  }
+  ResetWritePage();
+  RETURN_IF_ERROR(NewWritePage(page_len));
   *got_reservation = true;
   return Status::OK();
 }
 
 void BufferedTupleStreamV2::ResetWritePage() {
-  if (!has_write_iterator()) return;
+  if (write_page_ == nullptr) return;
   // Unpin the write page if we're reading in unpinned mode.
   Page* prev_write_page = write_page_;
   write_page_ = nullptr;
@@ -334,12 +439,37 @@ void BufferedTupleStreamV2::ResetWritePage() {
   UnpinPageIfNeeded(prev_write_page, pinned_);
 }
 
+void BufferedTupleStreamV2::InvalidateWriteIterator() {
+  if (!has_write_iterator()) return;
+  ResetWritePage();
+  has_write_iterator_ = false;
+  // No more pages will be appended to stream - do not need any write reservation.
+  write_page_reservation_.Close();
+  // May not need a read reservation once the write iterator is invalidated.
+  if (NeedReadReservation(pinned_, pages_.size(), has_read_iterator(),
+          read_page_ != pages_.end(), true, write_page_ != nullptr)
+      && !NeedReadReservation(pinned_, pages_.size(), has_read_iterator(),
+             read_page_ != pages_.end(), false, false)) {
+    buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
+  }
+}
+
 Status BufferedTupleStreamV2::NextReadPage() {
+  DCHECK(has_read_iterator());
   DCHECK(!closed_);
   CHECK_CONSISTENCY();
 
-  if (delete_on_read_) {
-    DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " " << DebugString();
+  if (read_page_ == pages_.end()) {
+    // No rows read yet - start reading at first page. If the stream is unpinned, we can
+    // use the reservation saved in PrepareForReadWrite() to pin the first page.
+    read_page_ = pages_.begin();
+    if (NeedReadReservation(pinned_, pages_.size(), true, false)
+        && !NeedReadReservation(pinned_, pages_.size(), true, true)) {
+      buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
+    }
+  } else if (delete_on_read_) {
+    DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " "
+                                         << DebugString();
     DCHECK_NE(&*read_page_, write_page_);
     bytes_pinned_ -= pages_.front().len();
     buffer_pool_->DestroyPage(buffer_pool_client_, &pages_.front().handle);
@@ -352,15 +482,27 @@ Status BufferedTupleStreamV2::NextReadPage() {
     UnpinPageIfNeeded(prev_read_page, pinned_);
   }
 
-  if (!has_read_iterator()) {
+  if (read_page_ == pages_.end()) {
     CHECK_CONSISTENCY();
     return Status::OK();
   }
 
-  // Ensure the next page is pinned for reading. If the stream is unpinned, we freed up
-  // enough reservation by deleting or unpinning the previous page.
-  // TODO: IMPALA-3208: this page may be larger than the previous, so this could
-  // actually fail once we have variable-length pages.
+  if (!pinned_ && read_page_->len() > default_page_len_
+      && buffer_pool_client_->GetUnusedReservation() < read_page_->len()) {
+    // If we are iterating over an unpinned stream and encounter a page that is larger
+    // than the default page length, then unpinning the previous page may not have
+    // freed up enough reservation to pin the next one. The client is responsible for
+    // ensuring the reservation is available, so this indicates a bug.
+    return Status(TErrorCode::INTERNAL_ERROR, Substitute("Internal error: couldn't pin "
+          "large page of $0 bytes, client only had $1 bytes of unused reservation:\n$2",
+          read_page_->len(), buffer_pool_client_->GetUnusedReservation(),
+          buffer_pool_client_->DebugString()));
+  }
+  // Ensure the next page is pinned for reading. By this point we should have enough
+  // reservation to pin the page. If the stream is pinned, the page is already pinned.
+  // If the stream is unpinned, we freed up enough memory for a default-sized page by
+  // deleting or unpinning the previous page and ensured that, if the page was larger,
+  // that the reservation is available with the above check.
   RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
 
   // This waits for the pin to complete if the page was unpinned earlier.
@@ -371,28 +513,42 @@ Status BufferedTupleStreamV2::NextReadPage() {
   read_ptr_ = read_buffer->data();
   read_end_ptr_ = read_ptr_ + read_buffer->len();
 
+  // We may need to save reservation for the write page in the case when the write page
+  // became a read/write page.
+  if (!NeedWriteReservation(pinned_, pages_.size(), has_write_iterator(),
+             write_page_ != nullptr, false)
+      && NeedWriteReservation(pinned_, pages_.size(), has_write_iterator(),
+             write_page_ != nullptr, has_read_write_page())) {
+    buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
+  }
   CHECK_CONSISTENCY();
   return Status::OK();
 }
 
-void BufferedTupleStreamV2::ResetReadPage() {
-  if (!has_read_iterator()) return;
-  // Unpin the write page if we're reading in unpinned mode.
-  Page* prev_read_page = &*read_page_;
-  read_page_ = pages_.end();
-  read_ptr_ = nullptr;
-  read_end_ptr_ = nullptr;
+void BufferedTupleStreamV2::InvalidateReadIterator() {
+  if (read_page_ != pages_.end()) {
+    // Unpin the write page if we're reading in unpinned mode.
+    Page* prev_read_page = &*read_page_;
+    read_page_ = pages_.end();
+    read_ptr_ = nullptr;
+    read_end_ptr_ = nullptr;
 
-  // May need to decrement pin count after destroying read iterator.
-  UnpinPageIfNeeded(prev_read_page, pinned_);
+    // May need to decrement pin count after destroying read iterator.
+    UnpinPageIfNeeded(prev_read_page, pinned_);
+  }
+  has_read_iterator_ = false;
+  if (read_page_reservation_.GetReservation() > 0) {
+    buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
+  }
 }
 
 Status BufferedTupleStreamV2::PrepareForRead(bool delete_on_read, bool* got_reservation) {
   CHECK_CONSISTENCY();
-  ResetWritePage();
-  ResetReadPage();
+  InvalidateWriteIterator();
+  InvalidateReadIterator();
   // If already pinned, no additional pin is needed (see ExpectedPinCount()).
-  *got_reservation = pinned_ || buffer_pool_client_->IncreaseReservationToFit(page_len_);
+  *got_reservation = pinned_ || pages_.empty()
+      || buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
   if (!*got_reservation) return Status::OK();
   return PrepareForReadInternal(delete_on_read);
 }
@@ -400,22 +556,28 @@ Status BufferedTupleStreamV2::PrepareForRead(bool delete_on_read, bool* got_rese
 Status BufferedTupleStreamV2::PrepareForReadInternal(bool delete_on_read) {
   DCHECK(!closed_);
   DCHECK(!delete_on_read_);
-  DCHECK(!pages_.empty());
   DCHECK(!has_read_iterator());
 
-  // Check if we need to increment the pin count of the read page.
-  read_page_ = pages_.begin();
-  RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
-
-  // This waits for the pin to complete if the page was unpinned earlier.
-  const BufferHandle* read_buffer;
-  RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer));
+  has_read_iterator_ = true;
+  if (pages_.empty()) {
+    // No rows to return, or a the first read/write page has not yet been allocated.
+    read_page_ = pages_.end();
+    read_ptr_ = nullptr;
+    read_end_ptr_ = nullptr;
+  } else {
+    // Eagerly pin the first page in the stream.
+    read_page_ = pages_.begin();
+    // Check if we need to increment the pin count of the read page.
+    RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
+    DCHECK(read_page_->is_pinned());
 
-  DCHECK(has_read_iterator());
-  DCHECK(read_page_->is_pinned());
+    // This waits for the pin to complete if the page was unpinned earlier.
+    const BufferHandle* read_buffer;
+    RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer));
+    read_ptr_ = read_buffer->data();
+    read_end_ptr_ = read_ptr_ + read_buffer->len();
+  }
   read_page_rows_returned_ = 0;
-  read_ptr_ = read_buffer->data();
-  read_end_ptr_ = read_ptr_ + read_buffer->len();
   rows_returned_ = 0;
   delete_on_read_ = delete_on_read;
   CHECK_CONSISTENCY();
@@ -435,9 +597,28 @@ Status BufferedTupleStreamV2::PinStream(bool* pinned) {
   for (Page& page : pages_) {
     bytes_to_pin += (ExpectedPinCount(true, &page) - page.pin_count()) * page.len();
   }
-  bool reservation_granted = buffer_pool_client_->IncreaseReservationToFit(bytes_to_pin);
+
+  // Check if we have some reservation to restore.
+  bool restore_write_reservation =
+      NeedWriteReservation(false) && !NeedWriteReservation(true);
+  bool restore_read_reservation =
+      NeedReadReservation(false) && !NeedReadReservation(true);
+  int64_t increase_needed = bytes_to_pin
+      - (restore_write_reservation ? default_page_len_ : 0)
+      - (restore_read_reservation ? default_page_len_ : 0);
+  bool reservation_granted =
+      buffer_pool_client_->IncreaseReservationToFit(increase_needed);
   if (!reservation_granted) return Status::OK();
 
+  // If there is no current write page we should have some saved reservation to use.
+  // Only continue saving it if the stream is empty and need it to pin the first page.
+  if (restore_write_reservation) {
+    buffer_pool_client_->RestoreReservation(&write_page_reservation_, default_page_len_);
+  }
+  if (restore_read_reservation) {
+    buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
+  }
+
   // At this point success is guaranteed - go through to pin the pages we need to pin.
   // If the page data was evicted from memory, the read I/O can happen in parallel
   // because we defer calling GetBuffer() until NextReadPage().
@@ -450,17 +631,27 @@ Status BufferedTupleStreamV2::PinStream(bool* pinned) {
 }
 
 void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) {
+  CHECK_CONSISTENCY();
   DCHECK(!closed_);
   if (mode == UNPIN_ALL) {
     // Invalidate the iterators so they don't keep pages pinned.
-    ResetWritePage();
-    ResetReadPage();
+    InvalidateWriteIterator();
+    InvalidateReadIterator();
   }
 
   if (pinned_) {
+    CHECK_CONSISTENCY();
     // If the stream was pinned, there may be some remaining pinned pages that should
     // be unpinned at this point.
     for (Page& page : pages_) UnpinPageIfNeeded(&page, false);
+
+    // Check to see if we need to save some of the reservation we freed up.
+    if (!NeedWriteReservation(true) && NeedWriteReservation(false)) {
+      buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
+    }
+    if (!NeedReadReservation(true) && NeedReadReservation(false)) {
+      buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_);
+    }
     pinned_ = false;
   }
   CHECK_CONSISTENCY();
@@ -519,15 +710,17 @@ Status BufferedTupleStreamV2::GetNextInternal(
   *eos = (rows_returned_ == num_rows_);
   if (*eos) return Status::OK();
 
-  if (UNLIKELY(read_page_rows_returned_ == read_page_->num_rows)) {
-    // Get the next page in the stream. We need to do this at the beginning of the
-    // GetNext() call to ensure the buffer management semantics. NextReadPage() may
-    // unpin or delete the buffer backing the rows returned from the *previous* call
-    // to GetNext().
+  if (UNLIKELY(read_page_ == pages_.end()
+          || read_page_rows_returned_ == read_page_->num_rows)) {
+    // Get the next page in the stream (or the first page if read_page_ was not yet
+    // initialized.) We need to do this at the beginning of the GetNext() call to ensure
+    // the buffer management semantics. NextReadPage() may unpin or delete the buffer
+    // backing the rows returned from the *previous* call to GetNext().
     RETURN_IF_ERROR(NextReadPage());
   }
 
   DCHECK(has_read_iterator());
+  DCHECK(read_page_ != pages_.end());
   DCHECK(read_page_->is_pinned()) << DebugString();
   DCHECK_GE(read_page_rows_returned_, 0);
 
@@ -635,7 +828,9 @@ int64_t BufferedTupleStreamV2::ComputeRowSize(TupleRow* row) const noexcept {
       if (row->GetTuple(i) != nullptr) size += fixed_tuple_sizes_[i];
     }
   } else {
-    size = fixed_tuple_row_size_;
+    for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) {
+      size += fixed_tuple_sizes_[i];
+    }
   }
   for (int i = 0; i < inlined_string_slots_.size(); ++i) {
     Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first);
@@ -668,55 +863,73 @@ int64_t BufferedTupleStreamV2::ComputeRowSize(TupleRow* row) const noexcept {
 }
 
 bool BufferedTupleStreamV2::AddRowSlow(TupleRow* row, Status* status) noexcept {
-  bool got_reservation;
-  *status = AdvanceWritePage(ComputeRowSize(row), &got_reservation);
-  if (!status->ok() || !got_reservation) return false;
-  return DeepCopy(row);
+  // Use AddRowCustomSlow() to do the work of advancing the page.
+  int64_t row_size = ComputeRowSize(row);
+  return AddRowCustomSlow(row_size,
+      [this, row, row_size](uint8_t* data) {
+        bool success = DeepCopy(row, &data, data + row_size);
+        DCHECK(success);
+        DCHECK_EQ(data, write_ptr_);
+      },
+      status);
 }
 
-uint8_t* BufferedTupleStreamV2::AllocateRowSlow(
-    int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) noexcept {
-  int64_t row_size = static_cast<int64_t>(fixed_size) + varlen_size;
+bool BufferedTupleStreamV2::AddRowCustomSlow(
+    int64_t size, const WriteRowFn& write_fn, Status* status) noexcept {
   bool got_reservation;
-  *status = AdvanceWritePage(row_size, &got_reservation);
-  if (!status->ok() || !got_reservation) return nullptr;
+  *status = AdvanceWritePage(size, &got_reservation);
+  if (!status->ok() || !got_reservation) return false;
 
   // We have a large-enough page so now success is guaranteed.
-  uint8_t* result = AllocateRow(fixed_size, varlen_size, varlen_data, status);
-  DCHECK(result != nullptr);
-  return result;
+  bool result = AddRowCustom(size, write_fn, status);
+  DCHECK(result);
+  if (size > default_page_len_) {
+    // Immediately unpin the large write page so that we're not using up extra reservation
+    // and so we don't append another row to the page.
+    ResetWritePage();
+    // Save some of the reservation we freed up so we can create the next write page when
+    // needed.
+    if (NeedWriteReservation()) {
+      buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
+    }
+  }
+  // The stream should be in a consistent state once the row is added.
+  CHECK_CONSISTENCY();
+  return true;
 }
 
-bool BufferedTupleStreamV2::DeepCopy(TupleRow* row) noexcept {
-  if (has_nullable_tuple_) {
-    return DeepCopyInternal<true>(row);
-  } else {
-    return DeepCopyInternal<false>(row);
+bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) noexcept {
+  DCHECK(!closed_);
+  DCHECK(has_write_iterator());
+  if (UNLIKELY(write_page_ == nullptr || !DeepCopy(row, &write_ptr_, write_end_ptr_))) {
+    return AddRowSlow(row, status);
   }
+  ++num_rows_;
+  ++write_page_->num_rows;
+  return true;
+}
+
+bool BufferedTupleStreamV2::DeepCopy(
+    TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept {
+  return has_nullable_tuple_ ? DeepCopyInternal<true>(row, data, data_end) :
+                               DeepCopyInternal<false>(row, data, data_end);
 }
 
 // TODO: consider codegening this.
 // TODO: in case of duplicate tuples, this can redundantly serialize data.
 template <bool HAS_NULLABLE_TUPLE>
-bool BufferedTupleStreamV2::DeepCopyInternal(TupleRow* row) noexcept {
-  if (UNLIKELY(write_page_ == nullptr)) return false;
-  DCHECK(write_page_->is_pinned()) << DebugString() << std::endl
-                                   << write_page_->DebugString();
-
+bool BufferedTupleStreamV2::DeepCopyInternal(
+    TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept {
+  uint8_t* pos = *data;
   const uint64_t tuples_per_row = desc_.tuple_descriptors().size();
-  uint32_t bytes_remaining = write_end_ptr_ - write_ptr_;
-
-  // Move to the next page we may not have enough space to append the fixed-length part
-  // of the row.
-  if (UNLIKELY((bytes_remaining < fixed_tuple_row_size_))) return false;
-
   // Copy the not NULL fixed len tuples. For the NULL tuples just update the NULL tuple
   // indicator.
   if (HAS_NULLABLE_TUPLE) {
-    uint8_t* null_indicators = write_ptr_;
     int null_indicator_bytes = NullIndicatorBytesPerRow();
+    if (UNLIKELY(pos + null_indicator_bytes > data_end)) return false;
+    uint8_t* null_indicators = pos;
+    pos += NullIndicatorBytesPerRow();
     memset(null_indicators, 0, null_indicator_bytes);
-    write_ptr_ += NullIndicatorBytesPerRow();
     for (int i = 0; i < tuples_per_row; ++i) {
       uint8_t* null_word = null_indicators + (i >> 3);
       const uint32_t null_pos = i & 7;
@@ -724,8 +937,9 @@ bool BufferedTupleStreamV2::DeepCopyInternal(TupleRow* row) noexcept {
       Tuple* t = row->GetTuple(i);
       const uint8_t mask = 1 << (7 - null_pos);
       if (t != nullptr) {
-        memcpy(write_ptr_, t, tuple_size);
-        write_ptr_ += tuple_size;
+        if (UNLIKELY(pos + tuple_size > data_end)) return false;
+        memcpy(pos, t, tuple_size);
+        pos += tuple_size;
       } else {
         *null_word |= mask;
       }
@@ -734,12 +948,13 @@ bool BufferedTupleStreamV2::DeepCopyInternal(TupleRow* row) noexcept {
     // If we know that there are no nullable tuples no need to set the nullability flags.
     for (int i = 0; i < tuples_per_row; ++i) {
       const int tuple_size = fixed_tuple_sizes_[i];
+      if (UNLIKELY(pos + tuple_size > data_end)) return false;
       Tuple* t = row->GetTuple(i);
       // TODO: Once IMPALA-1306 (Avoid passing empty tuples of non-materialized slots)
       // is delivered, the check below should become DCHECK(t != nullptr).
       DCHECK(t != nullptr || tuple_size == 0);
-      memcpy(write_ptr_, t, tuple_size);
-      write_ptr_ += tuple_size;
+      memcpy(pos, t, tuple_size);
+      pos += tuple_size;
     }
   }
 
@@ -749,7 +964,8 @@ bool BufferedTupleStreamV2::DeepCopyInternal(TupleRow* row) noexcept {
   for (int i = 0; i < inlined_string_slots_.size(); ++i) {
     const Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first);
     if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
-    if (UNLIKELY(!CopyStrings(tuple, inlined_string_slots_[i].second))) return false;
+    if (UNLIKELY(!CopyStrings(tuple, inlined_string_slots_[i].second, &pos, data_end)))
+      return false;
   }
 
   // Copy inlined collection slots. We copy collection data in a well-defined order so
@@ -757,48 +973,52 @@ bool BufferedTupleStreamV2::DeepCopyInternal(TupleRow* row) noexcept {
   for (int i = 0; i < inlined_coll_slots_.size(); ++i) {
     const Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first);
     if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
-    if (UNLIKELY(!CopyCollections(tuple, inlined_coll_slots_[i].second))) return false;
+    if (UNLIKELY(!CopyCollections(tuple, inlined_coll_slots_[i].second, &pos, data_end)))
+      return false;
   }
-
-  ++num_rows_;
-  ++write_page_->num_rows;
+  *data = pos;
   return true;
 }
 
-bool BufferedTupleStreamV2::CopyStrings(
-    const Tuple* tuple, const vector<SlotDescriptor*>& string_slots) {
+bool BufferedTupleStreamV2::CopyStrings(const Tuple* tuple,
+    const vector<SlotDescriptor*>& string_slots, uint8_t** data, const uint8_t* data_end) {
   for (const SlotDescriptor* slot_desc : string_slots) {
     if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
     const StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
     if (LIKELY(sv->len > 0)) {
-      if (UNLIKELY(write_ptr_ + sv->len > write_end_ptr_)) return false;
+      if (UNLIKELY(*data + sv->len > data_end)) return false;
 
-      memcpy(write_ptr_, sv->ptr, sv->len);
-      write_ptr_ += sv->len;
+      memcpy(*data, sv->ptr, sv->len);
+      *data += sv->len;
     }
   }
   return true;
 }
 
-bool BufferedTupleStreamV2::CopyCollections(
-    const Tuple* tuple, const vector<SlotDescriptor*>& collection_slots) {
+bool BufferedTupleStreamV2::CopyCollections(const Tuple* tuple,
+    const vector<SlotDescriptor*>& collection_slots, uint8_t** data, const uint8_t* data_end) {
   for (const SlotDescriptor* slot_desc : collection_slots) {
     if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
     const CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
     const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
     if (LIKELY(cv->num_tuples > 0)) {
       int coll_byte_size = cv->num_tuples * item_desc.byte_size();
-      if (UNLIKELY(write_ptr_ + coll_byte_size > write_end_ptr_)) return false;
-      uint8_t* coll_data = write_ptr_;
+      if (UNLIKELY(*data + coll_byte_size > data_end)) return false;
+      uint8_t* coll_data = *data;
       memcpy(coll_data, cv->ptr, coll_byte_size);
-      write_ptr_ += coll_byte_size;
+      *data += coll_byte_size;
 
       if (!item_desc.HasVarlenSlots()) continue;
       // Copy variable length data when present in collection items.
       for (int i = 0; i < cv->num_tuples; ++i) {
         const Tuple* item = reinterpret_cast<Tuple*>(coll_data);
-        if (UNLIKELY(!CopyStrings(item, item_desc.string_slots()))) return false;
-        if (UNLIKELY(!CopyCollections(item, item_desc.collection_slots()))) return false;
+        if (UNLIKELY(!CopyStrings(item, item_desc.string_slots(), data, data_end))) {
+          return false;
+        }
+        if (UNLIKELY(
+                !CopyCollections(item, item_desc.collection_slots(), data, data_end))) {
+          return false;
+        }
         coll_data += item_desc.byte_size();
       }
     }


Mime
View raw message