impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [2/3] incubator-impala git commit: IMPALA-5307: Part 4: copy out uncompressed text and seq
Date Wed, 08 Nov 2017 22:38:43 GMT
IMPALA-5307: Part 4: copy out uncompressed text and seq

This is the final patch for IMPALA-5307.

Text and Seq scanners are converted to use the same approach
as Avro.

contains_tuple_data is now false so a bunch of dead code in
ScannerContext can be removed. We also no longer attach I/O
buffers to row batches so that logic can be removed.

Testing:
Ran core ASAN tests.

Perf:
I reran the same benchmarks as in Part 2. There was no measurable
difference before and after - for both text and seq processing time
is dominated by text parsing.

Change-Id: I304fd002b61bfedf41c8b1405cd7eb7b492bb941
Reviewed-on: http://gerrit.cloudera.org:8080/8172
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/19c17e64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/19c17e64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/19c17e64

Branch: refs/heads/master
Commit: 19c17e64b54fcc0a599eadaa14d35943ee703ed4
Parents: 73d1bc3
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Tue Sep 26 15:27:30 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Wed Nov 8 10:09:25 2017 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py |  2 +-
 be/src/exec/base-sequence-scanner.cc  |  6 +--
 be/src/exec/exec-node.cc              |  9 +----
 be/src/exec/exec-node.h               |  8 +---
 be/src/exec/hdfs-avro-scanner-ir.cc   |  4 +-
 be/src/exec/hdfs-avro-scanner.cc      |  5 ---
 be/src/exec/hdfs-avro-scanner.h       |  6 ---
 be/src/exec/hdfs-parquet-scanner.cc   |  9 ++---
 be/src/exec/hdfs-rcfile-scanner.cc    |  4 --
 be/src/exec/hdfs-scan-node-base.h     |  6 ---
 be/src/exec/hdfs-scan-node-mt.cc      |  2 -
 be/src/exec/hdfs-scan-node.cc         |  9 +----
 be/src/exec/hdfs-scanner-ir.cc        | 16 ++++++--
 be/src/exec/hdfs-scanner.cc           | 20 +++++++++-
 be/src/exec/hdfs-scanner.h            | 27 +++++++++-----
 be/src/exec/hdfs-sequence-scanner.cc  | 22 ++++++++---
 be/src/exec/hdfs-text-scanner.cc      | 28 ++++++--------
 be/src/exec/parquet-column-readers.cc |  2 +-
 be/src/exec/scanner-context.cc        | 32 +++-------------
 be/src/exec/scanner-context.h         | 47 +++++++++--------------
 be/src/exec/text-converter.cc         |  4 +-
 be/src/runtime/row-batch.cc           | 28 +++-----------
 be/src/runtime/row-batch.h            | 60 +++++++++++++-----------------
 23 files changed, 144 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index 28e0a71..b3ad25d 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -174,7 +174,7 @@ ir_functions = [
   ["HDFS_SCANNER_INIT_TUPLE",
    "_ZN6impala11HdfsScanner9InitTupleEPNS_5TupleES2_"],
   ["HDFS_SCANNER_WRITE_ALIGNED_TUPLES",
-   "_ZN6impala11HdfsScanner18WriteAlignedTuplesEPNS_7MemPoolEPNS_8TupleRowEiPNS_13FieldLocationEiiii"],
+   "_ZN6impala11HdfsScanner18WriteAlignedTuplesEPNS_7MemPoolEPNS_8TupleRowEPNS_13FieldLocationEiiiib"],
   ["PROCESS_SCRATCH_BATCH",
    "_ZN6impala18HdfsParquetScanner19ProcessScratchBatchEPNS_8RowBatchE"],
   ["PARQUET_SCANNER_EVAL_RUNTIME_FILTER",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index c22f18d..fcf58c6 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -98,9 +98,6 @@ Status BaseSequenceScanner::Open(ScannerContext* context) {
     only_parsing_header_ = true;
     return Status::OK();
   }
-
-  // If the file is compressed, the buffers in the stream_ are not used directly.
-  if (header_->is_compressed) stream_->set_contains_tuple_data(false);
   RETURN_IF_ERROR(InitNewRange());
 
   // Skip to the first record
@@ -128,7 +125,6 @@ void BaseSequenceScanner::Close(RowBatch* row_batch) {
   if (row_batch != nullptr) {
     row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
     row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
-    context_->ReleaseCompletedResources(row_batch, true);
     if (scan_node_->HasRowBatchQueue()) {
       static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
         unique_ptr<RowBatch>(row_batch));
@@ -136,8 +132,8 @@ void BaseSequenceScanner::Close(RowBatch* row_batch) {
   } else {
     data_buffer_pool_->FreeAll();
     template_tuple_pool_->FreeAll();
-    context_->ReleaseCompletedResources(nullptr, true);
   }
+  context_->ReleaseCompletedResources(true);
 
   // Verify all resources (if any) have been transferred.
   DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index aaca8be..afb6249 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -103,21 +103,14 @@ unique_ptr<RowBatch> ExecNode::RowBatchQueue::GetBatch() {
   return unique_ptr<RowBatch>();
 }
 
-int ExecNode::RowBatchQueue::Cleanup() {
-  int num_io_buffers = 0;
-
+void ExecNode::RowBatchQueue::Cleanup() {
   unique_ptr<RowBatch> batch = NULL;
   while ((batch = GetBatch()) != NULL) {
-    num_io_buffers += batch->num_io_buffers();
     batch.reset();
   }
 
   lock_guard<SpinLock> l(lock_);
-  for (const unique_ptr<RowBatch>& row_batch: cleanup_queue_) {
-    num_io_buffers += row_batch->num_io_buffers();
-  }
   cleanup_queue_.clear();
-  return num_io_buffers;
 }
 
 ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index a0ed352..187d9a7 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -81,7 +81,6 @@ class ExecNode {
   virtual void Codegen(RuntimeState* state);
 
   /// Performs any preparatory work prior to calling GetNext().
-  /// Caller must not be holding any io buffers. This will cause deadlock.
   /// If overridden in subclass, must first call superclass's Open().
   /// Open() is called after Prepare() or Reset(), i.e., possibly multiple times
   /// throughout the lifetime of this node.
@@ -91,7 +90,7 @@ class ExecNode {
   /// child before acquiring their own resources to reduce the peak resource requirement.
   /// This is particularly important if there are multiple blocking ExecNodes in a
   /// pipeline because the lower nodes will release resources in Close() before the
-  /// Open() of their parent retuns.  The resource profile calculation in the frontend
+  /// Open() of their parent returns. The resource profile calculation in the frontend
   /// relies on this when computing the peak resources required for a query.
   virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
 
@@ -106,8 +105,6 @@ class ExecNode {
   /// In other words, if the memory holding the tuple data will be referenced
   /// by the callee in subsequent GetNext() calls, it must *not* be attached to the
   /// row_batch's tuple_data_pool.
-  /// Caller must not be holding any io buffers. This will cause deadlock.
-  /// TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet.
   virtual Status GetNext(
       RuntimeState* state, RowBatch* row_batch, bool* eos) WARN_UNUSED_RESULT = 0;
 
@@ -274,8 +271,7 @@ class ExecNode {
 
     /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch()
     /// after this is called.
-    /// Returns the number of io buffers that were released (for debug tracking)
-    int Cleanup();
+    void Cleanup();
 
    private:
     /// Lock protecting cleanup_queue_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-avro-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc b/be/src/exec/hdfs-avro-scanner-ir.cc
index a2bf606..ffd5774 100644
--- a/be/src/exec/hdfs-avro-scanner-ir.cc
+++ b/be/src/exec/hdfs-avro-scanner-ir.cc
@@ -37,7 +37,7 @@ int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** dat
   // If the file is uncompressed, StringValues will have pointers into the I/O buffers.
   // We don't attach I/O buffers to output batches so need to copy out data referenced
   // by tuples that survive conjunct evaluation.
-  const bool copy_out_strings = !header_->is_compressed && !string_slot_offsets_.empty();
+  const bool copy_strings = !header_->is_compressed && !string_slot_offsets_.empty();
   int num_to_commit = 0;
   for (int i = 0; i < max_tuples; ++i) {
     InitTuple(template_tuple_, tuple);
@@ -47,7 +47,7 @@ int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** dat
     }
     tuple_row->SetTuple(scan_node_->tuple_idx(), tuple);
     if (EvalConjuncts(tuple_row)) {
-      if (copy_out_strings) {
+      if (copy_strings) {
         if (UNLIKELY(!tuple->CopyStrings("HdfsAvroScanner::DecodeAvroData()",
               state_, string_slot_offsets_.data(), string_slot_offsets_.size(), pool,
               &parse_status_))) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index cad74e6..dea51a9 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -66,10 +66,6 @@ static Status CheckSchema(const AvroSchemaElement& avro_schema) {
 
 HdfsAvroScanner::HdfsAvroScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
   : BaseSequenceScanner(scan_node, state) {
-  for (SlotDescriptor* string_slot : scan_node_->tuple_desc()->string_slots()) {
-    string_slot_offsets_.push_back(
-        {string_slot->null_indicator_offset(), string_slot->tuple_offset()});
-  }
 }
 
 HdfsAvroScanner::HdfsAvroScanner()
@@ -80,7 +76,6 @@ HdfsAvroScanner::HdfsAvroScanner()
 Status HdfsAvroScanner::Open(ScannerContext* context) {
   RETURN_IF_ERROR(BaseSequenceScanner::Open(context));
   RETURN_IF_ERROR(CheckSchema(scan_node_->avro_schema()));
-  stream_->set_contains_tuple_data(false); // Avro scanner always copies out data.
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-avro-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index ab5746a..e2260a6 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -130,12 +130,6 @@ class HdfsAvroScanner : public BaseSequenceScanner {
     bool use_codegend_decode_avro_data;
   };
 
-  /// Offsets of string slots in the result tuple that may need to be copied as part of
-  /// tuple materialization. Populated in constructor. This is redundant with offset
-  /// information stored in the TupleDescriptor but storing only the required metadata
-  /// in a simple array of struct simplifies codegen and speeds up interpretation.
-  std::vector<SlotOffsets> string_slot_offsets_;
-
   AvroFileHeader* avro_header_ = nullptr;
 
   /// Current data block after decompression with its end and length.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index e35f64a..2897287 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -164,7 +164,6 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
 
 Status HdfsParquetScanner::Open(ScannerContext* context) {
   RETURN_IF_ERROR(HdfsScanner::Open(context));
-  stream_->set_contains_tuple_data(false);
   metadata_range_ = stream_->scan_range();
   num_cols_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
@@ -228,7 +227,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   // Release I/O buffers immediately to make sure they are cleaned up
   // in case we return a non-OK status anywhere below.
-  context_->ReleaseCompletedResources(nullptr, true);
+  context_->ReleaseCompletedResources(true);
   RETURN_IF_ERROR(footer_status);
 
   // Parse the file schema into an internal representation for schema resolution.
@@ -265,7 +264,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
   } else {
     template_tuple_pool_->FreeAll();
     dictionary_pool_.get()->FreeAll();
-    context_->ReleaseCompletedResources(nullptr, true);
+    context_->ReleaseCompletedResources(true);
     for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
     // The scratch batch may still contain tuple data. We can get into this case if
     // Open() fails or if the query is cancelled.
@@ -731,7 +730,7 @@ void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
   DCHECK(row_batch != nullptr);
   row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
   scratch_batch_->ReleaseResources(row_batch->tuple_data_pool());
-  context_->ReleaseCompletedResources(nullptr, true);
+  context_->ReleaseCompletedResources(true);
   for (ParquetColumnReader* col_reader : column_readers_) {
     col_reader->Close(row_batch);
   }
@@ -1676,8 +1675,6 @@ Status HdfsParquetScanner::InitColumns(
     DCHECK(stream != NULL);
 
     RETURN_IF_ERROR(scalar_reader->Reset(&col_chunk.meta_data, stream));
-    // Parquet column readers never return tuple data with pointers into I/O buffers.
-    stream->set_contains_tuple_data(false);
   }
   DCHECK_EQ(col_ranges.size(), num_scalar_readers);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index 2ea8229..a706c3d 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -77,10 +77,6 @@ Status HdfsRCFileScanner::InitNewRange() {
   // ptrs into the decompressed data).
   reuse_row_group_buffer_ = scan_node_->tuple_desc()->string_slots().empty();
 
-  // The scanner currently copies all the column data out of the io buffer so the
-  // stream never contains any tuple data.
-  stream_->set_contains_tuple_data(false);
-
   if (header_->is_compressed) {
     RETURN_IF_ERROR(Codec::CreateDecompressor(nullptr,
         reuse_row_group_buffer_, header_->codec, &decompressor_));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index f6e010a..6a0abde 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -421,12 +421,6 @@ class HdfsScanNodeBase : public ScanNode {
   AtomicInt32 num_scanners_codegen_enabled_;
   AtomicInt32 num_scanners_codegen_disabled_;
 
-  /// This is the number of io buffers that are owned by the scan node and the scanners.
-  /// This is used just to help debug leaked io buffers to determine if the leak is
-  /// happening in the scanners vs other parts of the execution.
-  /// TODO: Remove this counter when deprecating the multi-threaded scan node.
-  AtomicInt32 num_owned_io_buffers_;
-
   /// If true, counters are actively running and need to be reported in the runtime
   /// profile.
   bool counters_running_ = false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 8d4efec..6803b69 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -103,7 +103,6 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
   if (!status.ok()) {
     scanner_->Close(row_batch);
     scanner_.reset();
-    num_owned_io_buffers_.Add(-row_batch->num_io_buffers());
     return status;
   }
   InitNullCollectionValues(row_batch);
@@ -119,7 +118,6 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
     *eos = true;
   }
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-  num_owned_io_buffers_.Add(-row_batch->num_io_buffers());
 
   if (*eos) StopAndFinalizeCounters();
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 8cc85ff..8be191f 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -115,7 +115,6 @@ Status HdfsScanNode::GetNextInternal(
   *eos = false;
   unique_ptr<RowBatch> materialized_batch = materialized_row_batches_->GetBatch();
   if (materialized_batch != NULL) {
-    num_owned_io_buffers_.Add(-materialized_batch->num_io_buffers());
     row_batch->AcquireState(materialized_batch.get());
     // Update the number of materialized rows now instead of when they are materialized.
     // This means that scanners might process and queue up more rows than are necessary
@@ -133,7 +132,6 @@ Status HdfsScanNode::GetNextInternal(
       *eos = true;
       SetDone();
     }
-    DCHECK_EQ(materialized_batch->num_io_buffers(), 0);
     materialized_batch.reset();
     return Status::OK();
   }
@@ -229,16 +227,11 @@ Status HdfsScanNode::Open(RuntimeState* state) {
 void HdfsScanNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   SetDone();
-
   if (thread_avail_cb_id_ != -1) {
     state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_);
   }
-
   scanner_threads_.JoinAll();
-
-  num_owned_io_buffers_.Add(-materialized_row_batches_->Cleanup());
-  DCHECK_EQ(num_owned_io_buffers_.Load(), 0) << "ScanNode has leaked io buffers";
-
+  materialized_row_batches_->Cleanup();
   HdfsScanNodeBase::Close(state);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc
index 867ce1e..d7c2d81 100644
--- a/be/src/exec/hdfs-scanner-ir.cc
+++ b/be/src/exec/hdfs-scanner-ir.cc
@@ -19,6 +19,7 @@
 #include "runtime/row-batch.h"
 #include "util/string-parser.h"
 #include "runtime/string-value.inline.h"
+#include "runtime/tuple.h"
 
 #include "common/names.h"
 
@@ -33,9 +34,9 @@ using namespace impala;
 // This function takes more arguments than are strictly necessary (they could be
 // computed inside this function) but this is done to minimize the clang dependencies,
 // specifically, calling function on the scan node.
-int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int row_size,
+int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row,
     FieldLocation* fields, int num_tuples, int max_added_tuples,
-    int slots_per_tuple, int row_idx_start) {
+    int slots_per_tuple, int row_idx_start, bool copy_strings) {
   DCHECK(tuple_ != NULL);
   uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(tuple_row);
   uint8_t* tuple_mem = reinterpret_cast<uint8_t*>(tuple_);
@@ -53,9 +54,16 @@ int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int row_
     // function.
     if (WriteCompleteTuple(pool, fields, tuple, tuple_row, template_tuple_,
           error, &error_in_row)) {
+      if (copy_strings) {
+        if (UNLIKELY(!tuple->CopyStrings("HdfsScanner::WriteAlignedTuples()",
+              state_, string_slot_offsets_.data(), string_slot_offsets_.size(), pool,
+              &parse_status_))) {
+          return -1;
+        }
+      }
       ++tuples_returned;
-      tuple_mem += tuple_byte_size_;
-      tuple_row_mem += row_size;
+      tuple_mem += tuple_byte_size();
+      tuple_row_mem += sizeof(Tuple*);
       tuple = reinterpret_cast<Tuple*>(tuple_mem);
       tuple_row = reinterpret_cast<TupleRow*>(tuple_row_mem);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 4107e42..959aa91 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -48,6 +48,12 @@ HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
       template_tuple_pool_(new MemPool(scan_node->mem_tracker())),
       tuple_byte_size_(scan_node->tuple_desc()->byte_size()),
       data_buffer_pool_(new MemPool(scan_node->mem_tracker())) {
+  DCHECK_EQ(1, scan_node->row_desc()->tuple_descriptors().size())
+      << "All HDFS scanners assume one tuple per row";
+  for (SlotDescriptor* string_slot : scan_node_->tuple_desc()->string_slots()) {
+    string_slot_offsets_.push_back(
+        {string_slot->null_indicator_offset(), string_slot->tuple_offset()});
+  }
 }
 
 HdfsScanner::HdfsScanner()
@@ -188,7 +194,7 @@ Status HdfsScanner::CommitRows(int num_rows, RowBatch* row_batch) {
   // which can happen if the query is very selective. We need to release memory even
   // if no rows passed predicates.
   if (row_batch->AtCapacity() || context_->num_completed_io_buffers() > 0) {
-    context_->ReleaseCompletedResources(row_batch, /* done */ false);
+    context_->ReleaseCompletedResources(/* done */ false);
   }
   if (context_->cancelled()) return Status::CANCELLED;
   // Check for UDF errors.
@@ -524,6 +530,18 @@ Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanNodeBase* node,
       "WriteCompleteTuple");
   DCHECK_EQ(replaced, 1);
 
+  Function* copy_strings_fn;
+  RETURN_IF_ERROR(Tuple::CodegenCopyStrings(
+      codegen, *node->tuple_desc(), &copy_strings_fn));
+  replaced = codegen->ReplaceCallSites(
+      write_tuples_fn, copy_strings_fn, "CopyStrings");
+  DCHECK_EQ(replaced, 1);
+
+  int tuple_byte_size = node->tuple_desc()->byte_size();
+  replaced = codegen->ReplaceCallSitesWithValue(write_tuples_fn,
+      codegen->GetIntConstant(TYPE_INT, tuple_byte_size), "tuple_byte_size");
+  DCHECK_EQ(replaced, 1);
+
   *write_aligned_tuples_fn = codegen->FinalizeFunction(write_tuples_fn);
   if (*write_aligned_tuples_fn == NULL) {
     return Status("Failed to finalize write_aligned_tuples_fn.");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index cb69bbe..c603593 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -96,7 +96,7 @@ struct FieldLocation {
 //
 /// This class also encapsulates row batch management.  Subclasses should call CommitRows()
 /// after writing to the current row batch, which handles creating row batches, attaching
-/// resources (IO buffers and mem pools) to the current row batch, and passing row batches
+/// resources (buffers and mem pools) to the current row batch, and passing row batches
 /// up to the scan node. Subclasses can also use GetMemory() to help with per-row memory
 /// management.
 /// TODO: Have a pass over all members and move them out of the base class if sensible
@@ -137,7 +137,7 @@ class HdfsScanner {
   /// queue. Only valid to call if HasRowBatchQueue().
   void Close();
 
-  /// Transfers the ownership of memory backing returned tuples such as IO buffers
+  /// Transfers the ownership of memory backing returned tuples such as buffers
   /// and memory in mem pools to the given row batch. If the row batch is NULL,
   /// those resources are released instead. In any case, releases all other resources
   /// that are not backing returned rows (e.g. temporary decompression buffers).
@@ -271,13 +271,19 @@ class HdfsScanner {
   /// decompressor and any other per data block allocations.
   boost::scoped_ptr<MemPool> data_buffer_pool_;
 
+  /// Offsets of string slots in the result tuple that may need to be copied as part of
+  /// tuple materialization. Populated in constructor. This is redundant with offset
+  /// information stored in the TupleDescriptor but storing only the required metadata
+  /// in a simple array of struct simplifies codegen and speeds up interpretation.
+  std::vector<SlotOffsets> string_slot_offsets_;
+
   /// Time spent decompressing bytes.
   RuntimeProfile::Counter* decompress_timer_ = nullptr;
 
   /// Matching typedef for WriteAlignedTuples for codegen.  Refer to comments for
   /// that function.
-  typedef int (*WriteTuplesFn)(HdfsScanner*, MemPool*, TupleRow*, int, FieldLocation*,
-      int, int, int, int);
+  typedef int (*WriteTuplesFn)(HdfsScanner*, MemPool*, TupleRow*, FieldLocation*,
+      int, int, int, int, bool);
   /// Jitted write tuples function pointer.  Null if codegen is disabled.
   WriteTuplesFn write_tuples_fn_ = nullptr;
 
@@ -332,15 +338,18 @@ class HdfsScanner {
   /// - 'fields' must start at the beginning of a tuple.
   /// - 'num_tuples' number of tuples to process
   /// - 'max_added_tuples' the maximum number of tuples that should be added to the batch.
-  /// - 'row_start_index' is the number of rows that have already been processed
+  /// - 'row_idx_start' is the number of rows that have already been processed
   ///   as part of WritePartialTuple.
+  /// - 'copy_strings': if true, strings in returned tuples that pass conjuncts are
+  ///   copied into 'pool'
   /// Returns the number of tuples added to the row batch.  This can be less than
   /// num_tuples/tuples_till_limit because of failed conjuncts.
-  /// Returns -1 if parsing should be aborted due to parse errors.
+  /// Returns -1 if an error is encountered, e.g. a parse error or a memory allocation
+  /// error.
   /// Only valid to call if the parent scan node is multi-threaded.
-  int WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row_mem, int row_size,
-      FieldLocation* fields, int num_tuples,
-      int max_added_tuples, int slots_per_tuple, int row_start_indx);
+  int WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row_mem, FieldLocation* fields,
+      int num_tuples, int max_added_tuples, int slots_per_tuple, int row_idx_start,
+      bool copy_strings);
 
   /// Update the decompressor_ object given a compression type or codec name. Depending on
   /// the old compression type and the new one, it may close the old decompressor and/or

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 9b66432..1c248bb 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -266,6 +266,9 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) {
 
   // Materialize parsed cols to tuples
   SCOPED_TIMER(scan_node_->materialize_tuple_timer());
+
+  // Need to copy out strings if they may reference the original I/O buffer.
+  const bool copy_strings = !header_->is_compressed && !string_slot_offsets_.empty();
   // Call jitted function if possible
   int tuples_returned;
   if (write_tuples_fn_ != nullptr) {
@@ -275,12 +278,12 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) {
         delimited_text_parser_->escape_char() == '\0');
     // last argument: seq always starts at record_location[0]
     tuples_returned = write_tuples_fn_(this, row_batch->tuple_data_pool(), tuple_row,
-        row_batch->row_byte_size(), field_locations_.data(), num_to_process,
-        max_added_tuples, scan_node_->materialized_slots().size(), 0);
+        field_locations_.data(), num_to_process,
+        max_added_tuples, scan_node_->materialized_slots().size(), 0, copy_strings);
   } else {
     tuples_returned = WriteAlignedTuples(row_batch->tuple_data_pool(), tuple_row,
-        row_batch->row_byte_size(), field_locations_.data(), num_to_process,
-        max_added_tuples, scan_node_->materialized_slots().size(), 0);
+        field_locations_.data(), num_to_process,
+        max_added_tuples, scan_node_->materialized_slots().size(), 0, copy_strings);
   }
 
   if (tuples_returned == -1) return parse_status_;
@@ -301,6 +304,7 @@ Status HdfsSequenceScanner::ProcessRange(RowBatch* row_batch) {
   SCOPED_TIMER(scan_node_->materialize_tuple_timer());
   int64_t num_rows_read = 0;
 
+  const bool copy_strings = !seq_header->is_compressed && !string_slot_offsets_.empty();
   const bool has_materialized_slots = !scan_node_->materialized_slots().empty();
   while (!eos_) {
     DCHECK_GT(record_locations_.size(), 0);
@@ -323,12 +327,20 @@ Status HdfsSequenceScanner::ProcessRange(RowBatch* row_batch) {
       uint8_t error_in_row = false;
       uint8_t errors[num_fields];
       memset(errors, 0, num_fields);
-      add_row = WriteCompleteTuple(row_batch->tuple_data_pool(), field_locations_.data(),
+      MemPool* pool = row_batch->tuple_data_pool();
+      add_row = WriteCompleteTuple(pool, field_locations_.data(),
           tuple_, tuple_row_mem, template_tuple_, &errors[0], &error_in_row);
       if (UNLIKELY(error_in_row)) {
         ReportTupleParseError(field_locations_.data(), errors);
         RETURN_IF_ERROR(parse_status_);
       }
+      if (add_row && copy_strings) {
+        if (UNLIKELY(!tuple_->CopyStrings("HdfsSequenceScanner::ProcessRange()",
+              state_, string_slot_offsets_.data(), string_slot_offsets_.size(), pool,
+              &parse_status_))) {
+          return parse_status_;
+        }
+      }
     } else {
       add_row = WriteTemplateTuples(tuple_row_mem, 1);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index f1c1ff5..1cbc6f5 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -167,7 +167,6 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
   if (row_batch != nullptr) {
     row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
     row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
-    context_->ReleaseCompletedResources(row_batch, true);
     if (scan_node_->HasRowBatchQueue()) {
       static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
           unique_ptr<RowBatch>(row_batch));
@@ -175,8 +174,8 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
   } else {
     template_tuple_pool_->FreeAll();
     data_buffer_pool_->FreeAll();
-    context_->ReleaseCompletedResources(nullptr, true);
   }
+  context_->ReleaseCompletedResources(true);
 
   // Verify all resources (if any) have been transferred or freed.
   DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);
@@ -192,12 +191,6 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
 
 Status HdfsTextScanner::InitNewRange() {
   DCHECK_EQ(scan_state_, CONSTRUCTED);
-  // Compressed text does not reference data in the io buffers directly. In such case, we
-  // can recycle the buffers in the stream_ more promptly.
-  if (stream_->file_desc()->file_compression != THdfsCompression::NONE) {
-    stream_->set_contains_tuple_data(false);
-  }
-
   // Update the decompressor based on the compression type of the file in the context.
   DCHECK(stream_->file_desc()->file_compression != THdfsCompression::SNAPPY)
       << "FE should have generated SNAPPY_BLOCKED instead.";
@@ -591,7 +584,7 @@ Status HdfsTextScanner::FillByteBufferCompressedStream(MemPool* pool, bool* eosr
 
   if (*eosr) {
     DCHECK(stream_->eosr());
-    context_->ReleaseCompletedResources(nullptr, true);
+    context_->ReleaseCompletedResources(true);
   }
 
   return Status::OK();
@@ -637,7 +630,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) {
       &decompressed_buffer));
 
   // Inform 'stream_' that the buffer with the compressed text can be released.
-  context_->ReleaseCompletedResources(nullptr, true);
+  context_->ReleaseCompletedResources(true);
 
   VLOG_FILE << "Decompressed " << byte_buffer_read_size_ << " to " << decompressed_len;
   byte_buffer_ptr_ = reinterpret_cast<char*>(decompressed_buffer);
@@ -837,6 +830,9 @@ int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool,
 
   // Write complete tuples.  The current field, if any, is at the start of a tuple.
   if (num_tuples > 0) {
+    // Need to copy out strings if they may reference the original I/O buffer.
+    const bool copy_strings = !string_slot_offsets_.empty() &&
+        stream_->file_desc()->file_compression == THdfsCompression::NONE;
     int max_added_tuples = (scan_node_->limit() == -1) ?
         num_tuples : scan_node_->limit() - scan_node_->rows_returned();
     int tuples_returned = 0;
@@ -846,13 +842,13 @@ int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool,
       // slots and escape characters. TextConverter::WriteSlot() will be used instead.
       DCHECK(scan_node_->tuple_desc()->string_slots().empty() ||
           delimited_text_parser_->escape_char() == '\0');
-      tuples_returned = write_tuples_fn_(this, pool, row, sizeof(Tuple*), fields,
-          num_tuples, max_added_tuples, scan_node_->materialized_slots().size(),
-          num_tuples_processed);
+      tuples_returned = write_tuples_fn_(this, pool, row, fields, num_tuples,
+          max_added_tuples, scan_node_->materialized_slots().size(),
+          num_tuples_processed, copy_strings);
     } else {
-      tuples_returned = WriteAlignedTuples(pool, row, sizeof(Tuple*), fields,
-          num_tuples, max_added_tuples, scan_node_->materialized_slots().size(),
-          num_tuples_processed);
+      tuples_returned = WriteAlignedTuples(pool, row, fields, num_tuples,
+          max_added_tuples, scan_node_->materialized_slots().size(),
+          num_tuples_processed, copy_strings);
     }
     if (tuples_returned == -1) return 0;
     DCHECK_EQ(slot_idx_, 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 15c963d..04127e3 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -996,7 +996,7 @@ Status BaseScalarColumnReader::ReadDataPage() {
   }
   // We don't hold any pointers to earlier pages in the stream - we can safely free
   // any accumulated I/O or boundary buffers.
-  stream_->ReleaseCompletedResources(nullptr, false);
+  stream_->ReleaseCompletedResources(false);
 
   // Read the next data page, skipping page types we don't care about.
   // We break out of this loop on the non-error case (a data page was found or we read all

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 3ed8b4a..8cb195d 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -58,9 +58,9 @@ ScannerContext::~ScannerContext() {
   DCHECK(streams_.empty());
 }
 
-void ScannerContext::ReleaseCompletedResources(RowBatch* batch, bool done) {
+void ScannerContext::ReleaseCompletedResources(bool done) {
   for (int i = 0; i < streams_.size(); ++i) {
-    streams_[i]->ReleaseCompletedResources(batch, done);
+    streams_[i]->ReleaseCompletedResources(done);
   }
 }
 
@@ -87,13 +87,11 @@ ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) {
   stream->output_buffer_pos_ = NULL;
   stream->output_buffer_bytes_left_ =
       const_cast<int64_t*>(&OUTPUT_BUFFER_BYTES_LEFT_INIT);
-  stream->contains_tuple_data_ = scan_node_->tuple_desc()->ContainsStringData();
   streams_.push_back(std::move(stream));
   return streams_.back().get();
 }
 
-void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool done) {
-  DCHECK(batch != nullptr || done || !contains_tuple_data_);
+void ScannerContext::Stream::ReleaseCompletedResources(bool done) {
   if (done) {
     // Mark any pending resources as completed
     if (io_buffer_ != nullptr) {
@@ -108,24 +106,11 @@ void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool don
   }
 
   for (unique_ptr<DiskIoMgr::BufferDescriptor>& buffer : completed_io_buffers_) {
-    if (contains_tuple_data_ && batch != nullptr) {
-      batch->AddIoBuffer(move(buffer));
-      // TODO: We can do row batch compaction here.  This is the only place io buffers are
-      // queued.  A good heuristic is to check the number of io buffers queued and if
-      // there are too many, we should compact.
-    } else {
-      ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(buffer));
-      parent_->scan_node_->num_owned_io_buffers_.Add(-1);
-    }
+    ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(buffer));
   }
   parent_->num_completed_io_buffers_ -= completed_io_buffers_.size();
   completed_io_buffers_.clear();
 
-  if (contains_tuple_data_ && batch != nullptr) {
-    // If we're not done, keep using the last chunk allocated in boundary_pool_ so we
-    // don't have to reallocate. If we are done, transfer it to the row batch.
-    batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), /* keep_current */ !done);
-  }
   if (done) boundary_pool_->FreeAll();
 }
 
@@ -196,7 +181,6 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
         filename(), offset));
   }
 
-  parent_->scan_node_->num_owned_io_buffers_.Add(1);
   io_buffer_pos_ = reinterpret_cast<uint8_t*>(io_buffer_->buffer());
   io_buffer_bytes_left_ = io_buffer_->len();
   if (io_buffer_->len() == 0) {
@@ -262,13 +246,7 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
   DCHECK_GT(requested_len, boundary_buffer_bytes_left_);
   *out_buffer = NULL;
 
-  if (boundary_buffer_bytes_left_ == 0) {
-    if (contains_tuple_data_) {
-      boundary_buffer_->Reset();
-    } else {
-      boundary_buffer_->Clear();
-    }
-  }
+  if (boundary_buffer_bytes_left_ == 0) boundary_buffer_->Clear();
 
   DCHECK(ValidateBufferPointers());
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 3fe14aa..216209f 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -100,13 +100,6 @@ class ScannerContext {
     /// If we are past the end of the scan range, no bytes are returned.
     Status GetBuffer(bool peek, uint8_t** buffer, int64_t* out_len);
 
-    /// Sets whether of not the resulting tuples contain ptrs into memory owned by
-    /// the scanner context. This by default, is inferred from the scan_node tuple
-    /// descriptor (i.e. contains string slots) but can be overridden.  If possible,
-    /// this should be set to false to reduce memory usage as resources can be reused
-    /// and recycled more quickly.
-    void set_contains_tuple_data(bool v) { contains_tuple_data_ = v; }
-
     /// Callback that returns the buffer size to use when reading past the end of the scan
     /// range. Reading past the end of the scan range is likely a remote read, so we want
     /// find a good trade-off between io requests and data volume. Scanners that have
@@ -175,13 +168,10 @@ class ScannerContext {
     /// Skip this text object.
     bool SkipText(Status*);
 
-    /// If 'batch' is not NULL and 'contains_tuple_data_' is true, attaches all completed
-    /// io buffers and the boundary mem pool to 'batch'. If 'done' is set, all in-flight
-    /// resources are also attached or released.
-    /// If 'batch' is NULL then 'done' must be true or 'contains_tuple_data_' false. Such
-    /// a call will release all completed resources. If 'done' is true all in-flight
-    /// resources are also freed.
-    void ReleaseCompletedResources(RowBatch* batch, bool done);
+    /// Release all completed resources in the context, i.e. I/O and boundary buffers
+    /// that the caller has finished reading. If 'done' is true all resources are
+    /// freed, even if the caller has not read that data yet.
+    void ReleaseCompletedResources(bool done);
 
    private:
     friend class ScannerContext;
@@ -189,10 +179,6 @@ class ScannerContext {
     DiskIoMgr::ScanRange* scan_range_;
     const HdfsFileDesc* file_desc_;
 
-    /// If true, tuples will contain pointers into memory contained in this object.
-    /// That memory (io buffers or boundary buffers) must be attached to the row batch.
-    bool contains_tuple_data_;
-
     /// Total number of bytes returned from GetBytes()
     int64_t total_bytes_returned_;
 
@@ -240,7 +226,7 @@ class ScannerContext {
     /// List of buffers that are completed but still have bytes referenced by the caller.
     /// On the next GetBytes() call, these buffers are released (the caller by calling
     /// GetBytes() signals it is done with its previous bytes).  At this point the
-    /// buffers are either returned to the io mgr or attached to the current row batch.
+    /// buffers are returned to the I/O manager.
     std::deque<std::unique_ptr<DiskIoMgr::BufferDescriptor>> completed_io_buffers_;
 
     Stream(ScannerContext* parent);
@@ -280,22 +266,23 @@ class ScannerContext {
     return streams_[idx].get();
   }
 
-  /// If a non-NULL 'batch' is passed, attaches completed io buffers and boundary mem pools
-  /// from all streams to 'batch'. Attaching only completed resources ensures that buffers
-  /// (and their cleanup) trail the rows that reference them (row batches are consumed and
-  /// cleaned up in order by the rest of the query).
-  /// If 'done' is true, this is the final call for the current streams and any pending
-  /// resources in each stream are also passed to the row batch. Callers which want to
-  /// clear the streams from the context should also call ClearStreams().
-  ///
-  /// A NULL 'batch' may be passed to free all resources. It is only valid to pass a NULL
-  /// 'batch' when also passing 'done'.
+  /// Returns completed I/O buffers to the I/O manager. If 'done' is true, this is the
+  /// final call for the current streams and any pending resources in each stream are
+  /// also freed. Callers which want to clear the streams from the context should also
+  /// call ClearStreams().
   ///
   /// This must be called with 'done' set when the scanner is complete and no longer needs
   /// any resources (e.g. tuple memory, io buffers) returned from the current streams.
   /// After calling with 'done' set, this should be called again if new streams are
   /// created via AddStream().
-  void ReleaseCompletedResources(RowBatch* batch, bool done);
+  void ReleaseCompletedResources(bool done);
+
+  /// Overload with the signature expected by Impala-lzo to enable easier staging of
+  /// the API change. TODO: remove this once Impala-lzo is updated to use the new
+  /// signature.
+  void ReleaseCompletedResources(RowBatch* batch, bool done) {
+    ReleaseCompletedResources(done);
+  }
 
   /// Releases all the Stream objects in the vector 'streams_' and reduces the vector's
   /// size to 0.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/text-converter.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/text-converter.cc b/be/src/exec/text-converter.cc
index 0cad05c..3bb65f2 100644
--- a/be/src/exec/text-converter.cc
+++ b/be/src/exec/text-converter.cc
@@ -103,8 +103,8 @@ void TextConverter::UnescapeString(const char* src, char* dest, int* len,
 //   store i8 %null_bit_set, i8* %null_byte_ptr
 //   ret i1 false
 //}
-
-
+// TODO: convert this function to use cross-compilation + constant substitution in whole
+// or part. It is currently too complex and doesn't implement the full functionality.
 Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
     TupleDescriptor* tuple_desc, SlotDescriptor* slot_desc, Function** fn,
     const char* null_col_val, int len, bool check_null, bool strict_mode) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index beed671..a6e935a 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -45,7 +45,7 @@ RowBatch::RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* mem_
     flush_(FlushMode::NO_FLUSH_RESOURCES),
     needs_deep_copy_(false),
     num_tuples_per_row_(row_desc->tuple_descriptors().size()),
-    auxiliary_mem_usage_(0),
+    attached_buffer_bytes_(0),
     tuple_data_pool_(mem_tracker),
     row_desc_(row_desc),
     mem_tracker_(mem_tracker) {
@@ -72,7 +72,7 @@ RowBatch::RowBatch(
     flush_(FlushMode::NO_FLUSH_RESOURCES),
     needs_deep_copy_(false),
     num_tuples_per_row_(input_batch.row_tuples.size()),
-    auxiliary_mem_usage_(0),
+    attached_buffer_bytes_(0),
     tuple_data_pool_(mem_tracker),
     row_desc_(row_desc),
     mem_tracker_(mem_tracker) {
@@ -144,9 +144,6 @@ RowBatch::RowBatch(
 
 RowBatch::~RowBatch() {
   tuple_data_pool_.FreeAll();
-  for (int i = 0; i < io_buffers_.size(); ++i) {
-    ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i]));
-  }
   for (BufferInfo& buffer_info : buffers_) {
     ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
         buffer_info.client, &buffer_info.buffer);
@@ -285,16 +282,9 @@ void RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples,
   DCHECK_EQ(offset, size);
 }
 
-void RowBatch::AddIoBuffer(unique_ptr<DiskIoMgr::BufferDescriptor> buffer) {
-  DCHECK(buffer != NULL);
-  auxiliary_mem_usage_ += buffer->buffer_len();
-  buffer->TransferOwnership(mem_tracker_);
-  io_buffers_.emplace_back(move(buffer));
-}
-
 void RowBatch::AddBuffer(BufferPool::ClientHandle* client,
     BufferPool::BufferHandle&& buffer, FlushMode flush) {
-  auxiliary_mem_usage_ += buffer.len();
+  attached_buffer_bytes_ += buffer.len();
   BufferInfo buffer_info;
   buffer_info.client = client;
   buffer_info.buffer = std::move(buffer);
@@ -307,26 +297,18 @@ void RowBatch::Reset() {
   capacity_ = tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*));
   // TODO: Change this to Clear() and investigate the repercussions.
   tuple_data_pool_.FreeAll();
-  for (int i = 0; i < io_buffers_.size(); ++i) {
-    ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i]));
-  }
-  io_buffers_.clear();
   for (BufferInfo& buffer_info : buffers_) {
     ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
         buffer_info.client, &buffer_info.buffer);
   }
   buffers_.clear();
-  auxiliary_mem_usage_ = 0;
+  attached_buffer_bytes_ = 0;
   flush_ = FlushMode::NO_FLUSH_RESOURCES;
   needs_deep_copy_ = false;
 }
 
 void RowBatch::TransferResourceOwnership(RowBatch* dest) {
   dest->tuple_data_pool_.AcquireData(&tuple_data_pool_, false);
-  for (int i = 0; i < io_buffers_.size(); ++i) {
-    dest->AddIoBuffer(move(io_buffers_[i]));
-  }
-  io_buffers_.clear();
   for (BufferInfo& buffer_info : buffers_) {
     dest->AddBuffer(
         buffer_info.client, std::move(buffer_info.buffer), FlushMode::NO_FLUSH_RESOURCES);
@@ -362,7 +344,7 @@ void RowBatch::AcquireState(RowBatch* src) {
   // The destination row batch should be empty.
   DCHECK(!needs_deep_copy_);
   DCHECK_EQ(num_rows_, 0);
-  DCHECK_EQ(auxiliary_mem_usage_, 0);
+  DCHECK_EQ(attached_buffer_bytes_, 0);
 
   num_rows_ = src->num_rows_;
   capacity_ = src->capacity_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 49dd066..5a7edd4 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -43,16 +43,18 @@ class TupleDescriptor;
 
 /// A RowBatch encapsulates a batch of rows, each composed of a number of tuples.
 /// The maximum number of rows is fixed at the time of construction.
-/// The row batch reference a few different sources of memory.
-///   1. TupleRow ptrs - may be malloc'd and owned by the RowBatch or allocated from
-///      the tuple pool, depending on whether legacy joins and aggs are enabled.
-///      See the comment on tuple_ptrs_ for more details.
-///   2. Tuple memory - this is allocated (or transferred to) the row batches tuple pool.
-///   3. Auxiliary tuple memory (e.g. string data) - this can either be stored externally
-///      (don't copy strings) or from the tuple pool (strings are copied).  If external,
-///      the data is in an io buffer that may not be attached to this row batch.  The
-///      creator of that row batch has to make sure that the io buffer is not recycled
-///      until all batches that reference the memory have been consumed.
+/// The row batch can reference various types of memory.
+///   1. TupleRow ptrs - malloc'd and owned by the RowBatch. See the comment on
+///      tuple_ptrs_ for more details.
+///   2. Fixed and variable-length tuple data. This memory may be directly attached to
+///      the batch: either in the batch's MemPool or in an attached buffer. Or it may
+///      live elsewhere - either in a subsequent batch returned by an ExecNode or
+///      still be owned by the ExecNode that produced the batch. In those cases the
+///      owner of this RowBatch must be careful not to close the producing ExecNode
+///      or free resources from trailing batches while the batch's data is still being
+///      used.
+///     TODO: IMPALA-4179: simplify the ownership transfer model.
+///
 /// In order to minimize memory allocations, RowBatches and TRowBatches that have been
 /// serialized and sent over the wire should be reused (this prevents compression_scratch_
 /// from being needlessly reallocated).
@@ -62,13 +64,13 @@ class TupleDescriptor;
 /// and reference memory outside of the row batch. This results in most row batches
 /// having a very small memory footprint and in some row batches having a very large
 /// one (it contains all the memory that other row batches are referencing). An example
-/// is IoBuffers which are only attached to one row batch. Only when the row batch reaches
+/// is buffers which are only attached to one row batch. Only when the row batch reaches
 /// a blocking operator or the root of the fragment is the row batch memory freed.
 /// This means that in some cases (e.g. very selective queries), we still need to
 /// pass the row batch through the exec nodes (even if they have no rows) to trigger
-/// memory deletion. AtCapacity() encapsulates the check that we are not accumulating
-/// excessive memory.
-//
+/// memory deletion. AtCapacity() encapsulates the check that the batch does not have
+/// excessive memory attached to it.
+///
 /// A row batch is considered at capacity if all the rows are full or it has accumulated
 /// auxiliary memory up to a soft cap. (See at_capacity_mem_usage_ comment).
 class RowBatch {
@@ -138,7 +140,7 @@ class RowBatch {
     // MarkFlushResources().
     DCHECK((!needs_deep_copy_ && flush_ == FlushMode::NO_FLUSH_RESOURCES)
         || num_rows_ == capacity_);
-    int64_t mem_usage = auxiliary_mem_usage_ + tuple_data_pool_.total_allocated_bytes();
+    int64_t mem_usage = attached_buffer_bytes_ + tuple_data_pool_.total_allocated_bytes();
     return num_rows_ == capacity_ || mem_usage >= AT_CAPACITY_MEM_USAGE;
   }
 
@@ -203,24 +205,18 @@ class RowBatch {
   };
 
   int num_tuples_per_row() { return num_tuples_per_row_; }
-  int row_byte_size() { return num_tuples_per_row_ * sizeof(Tuple*); }
   MemPool* tuple_data_pool() { return &tuple_data_pool_; }
-  int num_io_buffers() const { return io_buffers_.size(); }
   int num_buffers() const { return buffers_.size(); }
 
   /// Resets the row batch, returning all resources it has accumulated.
   void Reset();
 
-  /// Add io buffer to this row batch.
-  void AddIoBuffer(std::unique_ptr<DiskIoMgr::BufferDescriptor> buffer);
-
   /// Adds a buffer to this row batch. The buffer is deleted when freeing resources.
   /// The buffer's memory remains accounted against the original owner, even when the
   /// ownership of batches is transferred. If the original owner wants the memory to be
   /// released, it should call this with 'mode' FLUSH_RESOURCES (see MarkFlushResources()
   /// for further explanation).
-  /// TODO: IMPALA-4179: after IMPALA-3200, simplify the ownership transfer model and
-  /// make it consistent between buffers and I/O buffers.
+  /// TODO: IMPALA-4179: simplify the ownership transfer model.
   void AddBuffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer,
       FlushMode flush);
 
@@ -230,10 +226,10 @@ class RowBatch {
   /// can be added. The "flush" mark is transferred by TransferResourceOwnership(). This
   /// ensures that batches are flushed by streaming operators all the way up the operator
   /// tree. Blocking operators can still accumulate batches with this flag.
-  /// TODO: IMPALA-3200: blocking operators should acquire all memory resources including
-  /// attached blocks/buffers, so that MarkFlushResources() can guarantee that the
+  /// TODO: IMPALA-4179: blocking operators should acquire all memory resources including
+  /// attached buffers, so that MarkFlushResources() can guarantee that the
   /// resources will not be accounted against the original operator (this is currently
-  /// not true for Blocks, which can't be transferred).
+  /// not true for buffers, which aren't transferred).
   void MarkFlushResources() {
     DCHECK_LE(num_rows_, capacity_);
     capacity_ = num_rows_;
@@ -256,7 +252,7 @@ class RowBatch {
   bool needs_deep_copy() { return needs_deep_copy_; }
 
   /// Transfer ownership of resources to dest.  This includes tuple data in mem
-  /// pool and io buffers.
+  /// pool and buffers.
   void TransferResourceOwnership(RowBatch* dest);
 
   void CopyRow(TupleRow* src, TupleRow* dest) {
@@ -277,7 +273,7 @@ class RowBatch {
     memset(row, 0, num_tuples_per_row_ * sizeof(Tuple*));
   }
 
-  /// Acquires state from the 'src' row batch into this row batch. This includes all IO
+  /// Acquires state from the 'src' row batch into this row batch. This includes all
   /// buffers and tuple data.
   /// This row batch must be empty and have the same row descriptor as the src batch.
   /// This is used for scan nodes which produce RowBatches asynchronously.  Typically,
@@ -399,9 +395,8 @@ class RowBatch {
   int tuple_ptrs_size_;
   Tuple** tuple_ptrs_;
 
-  /// Sum of all auxiliary bytes. This includes IoBuffers and memory from
-  /// TransferResourceOwnership().
-  int64_t auxiliary_mem_usage_;
+  /// Total bytes of BufferPool buffers attached to this batch.
+  int64_t attached_buffer_bytes_;
 
   /// holding (some of the) data referenced by rows
   MemPool tuple_data_pool_;
@@ -415,11 +410,6 @@ class RowBatch {
 
   MemTracker* mem_tracker_;  // not owned
 
-  /// IO buffers current owned by this row batch. Ownership of IO buffers transfer
-  /// between row batches. Any IO buffer will be owned by at most one row batch
-  /// (i.e. they are not ref counted) so most row batches don't own any.
-  std::vector<std::unique_ptr<DiskIoMgr::BufferDescriptor>> io_buffers_;
-
   struct BufferInfo {
     BufferPool::ClientHandle* client;
     BufferPool::BufferHandle buffer;


Mime
View raw message