impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [4/5] incubator-impala git commit: IMPALA-5497: spilling hash joins that output build rows hit OOM
Date Wed, 21 Jun 2017 22:06:48 GMT
IMPALA-5497: spilling hash joins that output build rows hit OOM

The bug is that the join tried to bring the next spilled partition into
memory while still holding onto memory from the current partition.
The fix is to return earlier if the output batch is at capacity so
that resources are flushed.

Also reduce some of the redundancy in the loop that drives the spilling
logic and catch some dropped statuses..

Testing:
The failure was originally reproduced by my IMPALA-4703 patch. I was
able to cause a query failure with the current code by reducing the
memory limit for an existing query. Before it failed with up to 12MB of
memory. Now it succeeds with 8MB or less.

Ran exhaustive build.

Change-Id: I075388d348499c5692d044ac1bc38dd8dd0b10c7
Reviewed-on: http://gerrit.cloudera.org:8080/7180
Reviewed-by: Dan Hecht <dhecht@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fae36fc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fae36fc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fae36fc7

Branch: refs/heads/master
Commit: fae36fc77d249c6e8ac372ae47c25747f670bc9f
Parents: 266cd53
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Wed Jun 14 09:23:35 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Wed Jun 21 20:56:00 2017 +0000

----------------------------------------------------------------------
 be/src/exec/partitioned-hash-join-builder.cc    |  2 +-
 be/src/exec/partitioned-hash-join-builder.h     | 34 ++++----
 be/src/exec/partitioned-hash-join-node-ir.cc    |  3 +-
 be/src/exec/partitioned-hash-join-node.cc       | 37 ++++----
 be/src/exec/partitioned-hash-join-node.h        | 90 ++++++++++----------
 .../queries/QueryTest/spilling.test             |  4 +-
 6 files changed, 85 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fae36fc7/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index a214f74..4a5885b 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -550,7 +550,7 @@ Status PhjBuilder::RepartitionBuildInput(
   // reservation and avoid this complication.
   while (true) {
     bool got_buffer;
-    input_probe_rows->PrepareForRead(true, &got_buffer);
+    RETURN_IF_ERROR(input_probe_rows->PrepareForRead(true, &got_buffer));
     if (got_buffer) break;
     RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fae36fc7/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index daa3969..e0393b5 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -76,7 +76,7 @@ class PhjBuilder : public DataSink {
 
   Status InitExprsAndFilters(RuntimeState* state,
       const std::vector<TEqJoinCondition>& eq_join_conjuncts,
-      const std::vector<TRuntimeFilterDesc>& filters);
+      const std::vector<TRuntimeFilterDesc>& filters) WARN_UNUSED_RESULT;
 
   /// Implementations of DataSink interface methods.
   virtual std::string GetName() override;
@@ -121,8 +121,8 @@ class PhjBuilder : public DataSink {
   /// 'level' is the level new partitions should be created with. This functions prepares
   /// 'input_probe_rows' for reading in "delete_on_read" mode, so that the probe phase
   /// has enough buffers preallocated to execute successfully.
-  Status RepartitionBuildInput(
-      Partition* input_partition, int level, BufferedTupleStream* input_probe_rows);
+  Status RepartitionBuildInput(Partition* input_partition, int level,
+      BufferedTupleStream* input_probe_rows) WARN_UNUSED_RESULT;
 
   /// Returns the largest build row count out of the current hash partitions.
   int64_t LargestPartitionRows() const;
@@ -196,11 +196,11 @@ class PhjBuilder : public DataSink {
     /// pinned or the hash table could not be built due to memory pressure, sets *built
     /// to false and returns OK. Returns an error status if any other error is
     /// encountered.
-    Status BuildHashTable(bool* built);
+    Status BuildHashTable(bool* built) WARN_UNUSED_RESULT;
 
     /// Spills this partition, the partition's stream is unpinned with 'mode' and
     /// its hash table is destroyed if it was built.
-    Status Spill(BufferedTupleStream::UnpinMode mode);
+    Status Spill(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT;
 
     bool ALWAYS_INLINE IsClosed() const { return build_rows_ == NULL; }
     BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); }
@@ -267,14 +267,15 @@ class PhjBuilder : public DataSink {
   /// Create and initialize a set of hash partitions for partitioning level 'level'.
   /// The previous hash partitions must have been cleared with ClearHashPartitions().
   /// After calling this, batches are added to the new partitions by calling Send().
-  Status CreateHashPartitions(int level);
+  Status CreateHashPartitions(int level) WARN_UNUSED_RESULT;
 
   /// Create a new partition in 'all_partitions_' and prepare it for writing.
-  Status CreateAndPreparePartition(int level, Partition** partition);
+  Status CreateAndPreparePartition(int level, Partition** partition) WARN_UNUSED_RESULT;
 
   /// Reads the rows in build_batch and partitions them into hash_partitions_. If
   /// 'build_filters' is true, runtime filters are populated.
-  Status ProcessBuildBatch(RowBatch* build_batch, HashTableCtx* ctx, bool build_filters);
+  Status ProcessBuildBatch(
+      RowBatch* build_batch, HashTableCtx* ctx, bool build_filters) WARN_UNUSED_RESULT;
 
   /// Append 'row' to 'stream'. In the common case, appending the row to the stream
   /// immediately succeeds. Otherwise this function falls back to the slower path of
@@ -282,19 +283,20 @@ class PhjBuilder : public DataSink {
   /// and sets 'status' if it was unable to append the row, even after spilling
   /// partitions. This odd return convention is used to avoid emitting unnecessary code
   /// for ~Status in perf-critical code.
-  bool AppendRow(BufferedTupleStream* stream, TupleRow* row, Status* status);
+  bool AppendRow(
+      BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
 
   /// Slow path for AppendRow() above. It is called when the stream has failed to append
   /// the row. We need to find more memory by either switching to IO-buffers, in case the
   /// stream still uses small buffers, or spilling a partition. Returns false and sets
   /// 'status' if it was unable to append the row, even after spilling partitions.
-  bool AppendRowStreamFull(
-      BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept;
+  bool AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row,
+      Status* status) noexcept WARN_UNUSED_RESULT;
 
   /// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed
   /// to the Spill() call for the selected partition. The current policy is to spill the
   /// largest partition. Returns non-ok status if we couldn't spill a partition.
-  Status SpillPartition(BufferedTupleStream::UnpinMode mode);
+  Status SpillPartition(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT;
 
   /// Tries to build hash tables for all unspilled hash partitions. Called after
   /// FlushFinal() when all build rows have been partitioned and added to the appropriate
@@ -307,13 +309,13 @@ class PhjBuilder : public DataSink {
   /// 2. in-memory. The build rows are pinned and has a hash table built. No probe
   ///     partition is created.
   /// 3. spilled. The build rows are fully unpinned and the probe stream is prepared.
-  Status BuildHashTablesAndPrepareProbeStreams();
+  Status BuildHashTablesAndPrepareProbeStreams() WARN_UNUSED_RESULT;
 
   /// Ensures that 'spilled_partition_probe_streams_' has a stream per spilled partition
   /// in 'hash_partitions_'. May spill additional partitions until it can create enough
   /// probe streams with write buffers. Returns an error if an error is encountered or
   /// if it runs out of partitions to spill.
-  Status InitSpilledPartitionProbeStreams();
+  Status InitSpilledPartitionProbeStreams() WARN_UNUSED_RESULT;
 
   /// Calls Close() on every Partition, deletes them, and cleans up any pointers that
   /// may reference them. Also cleans up 'spilled_partition_probe_streams_'.
@@ -332,13 +334,13 @@ class PhjBuilder : public DataSink {
   /// Codegen processing build batches. Identical signature to ProcessBuildBatch().
   /// Returns non-OK status if codegen was not possible.
   Status CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn,
-      llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn);
+      llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn) WARN_UNUSED_RESULT;
 
   /// Codegen inserting batches into a partition's hash table. Identical signature to
   /// Partition::InsertBatch(). Returns non-OK if codegen was not possible.
   Status CodegenInsertBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn,
       llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn,
-      TPrefetchMode::type prefetch_mode);
+      TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
 
   RuntimeState* const runtime_state_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fae36fc7/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index 42c9f4f..2c951d1 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -242,7 +242,8 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRow(
         remaining_capacity, status);
   } else {
     DCHECK(JoinOp == TJoinOp::RIGHT_OUTER_JOIN ||
-           JoinOp == TJoinOp::LEFT_OUTER_JOIN || TJoinOp::FULL_OUTER_JOIN);
+           JoinOp == TJoinOp::LEFT_OUTER_JOIN ||
+           JoinOp == TJoinOp::FULL_OUTER_JOIN);
     return ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_evals,
         num_other_join_conjuncts, conjunct_evals, num_conjuncts, out_batch_iterator,
         remaining_capacity);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fae36fc7/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 2751145..0ecfab2 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -414,9 +414,10 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
     ht_ctx_->set_level(next_partition_level);
 
     // Spill to free memory from hash tables and pinned streams for use in new partitions.
-    build_partition->Spill(BufferedTupleStream::UNPIN_ALL);
+    RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStream::UNPIN_ALL));
     // Temporarily free up the probe buffer to use when repartitioning.
-    input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+    RETURN_IF_ERROR(
+        input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
     DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0) << NodeDebugString();
     DCHECK_EQ(input_partition_->probe_rows()->blocks_pinned(), 0) << NodeDebugString();
     int64_t num_input_rows = build_partition->build_rows()->num_rows();
@@ -507,19 +508,14 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch*
out_batch
 
       // Flush the remaining unmatched build rows of any partitions we are done
       // processing before moving onto the next partition.
-      OutputUnmatchedBuild(out_batch);
-      if (!output_build_partitions_.empty()) break;
+      RETURN_IF_ERROR(OutputUnmatchedBuild(out_batch));
+      if (out_batch->AtCapacity()) break;
 
-      // Finished outputting unmatched build rows, move to next partition.
+      // Stopped before batch was at capacity: - we must have finished outputting
+      // unmatched build rows.
+      DCHECK(output_build_partitions_.empty());
       DCHECK_EQ(builder_->num_hash_partitions(), 0);
       DCHECK(probe_hash_partitions_.empty());
-      bool got_partition;
-      RETURN_IF_ERROR(PrepareSpilledPartitionForProbe(state, &got_partition));
-      if (!got_partition) {
-        *eos = true;
-        break;
-      }
-      if (out_batch->AtCapacity()) break;
     }
 
     if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
@@ -609,13 +605,11 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch*
out_batch
       if (out_batch->AtCapacity()) break;
     }
 
-    if (!output_build_partitions_.empty()) {
-      DCHECK(NeedToProcessUnmatchedBuildRows());
-      // There are some partitions that need to flush their unmatched build rows.
-      OutputUnmatchedBuild(out_batch);
-      if (!output_build_partitions_.empty()) break;
-    }
-    // Move onto the next spilled partition.
+    // There are some partitions that need to flush their unmatched build rows.
+    if (!output_build_partitions_.empty()) continue;
+
+    // We get this far if there is nothing left to return from the current partition.
+    // Move to the next spilled partition.
     bool got_partition;
     RETURN_IF_ERROR(PrepareSpilledPartitionForProbe(state, &got_partition));
     if (got_partition) continue; // Probe the spilled partition.
@@ -651,7 +645,7 @@ Status PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch)
{
     RETURN_IF_ERROR(OutputAllBuild(out_batch));
   } else {
     // We built and processed the hash table, so sweep over it and output unmatched rows.
-    RETURN_IF_ERROR(OutputUnmatchedBuildFromHashTable(out_batch));
+    OutputUnmatchedBuildFromHashTable(out_batch);
   }
 
   num_rows_returned_ += out_batch->num_rows() - start_num_rows;
@@ -704,7 +698,7 @@ Status PartitionedHashJoinNode::OutputAllBuild(RowBatch* out_batch) {
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::OutputUnmatchedBuildFromHashTable(RowBatch* out_batch) {
+void PartitionedHashJoinNode::OutputUnmatchedBuildFromHashTable(RowBatch* out_batch) {
   ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
   const int num_conjuncts = conjuncts_.size();
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
@@ -738,7 +732,6 @@ Status PartitionedHashJoinNode::OutputUnmatchedBuildFromHashTable(RowBatch*
out_
           output_build_partitions_.front()->hash_tbl()->FirstUnmatched(ht_ctx_.get());
     }
   }
-  return Status::OK();
 }
 
 void PartitionedHashJoinNode::OutputBuildRow(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fae36fc7/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index d6f6f18..68177ad 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -159,7 +159,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Called after the builder has partitioned the build rows and built hash tables,
   /// either in the initial build step, or after repartitioning a spilled partition.
   /// After this function returns, all partitions are ready to process probe rows.
-  Status PrepareForProbe();
+  Status PrepareForProbe() WARN_UNUSED_RESULT;
 
   /// Creates an initialized probe partition at 'partition_idx' in
   /// 'probe_hash_partitions_'.
@@ -171,7 +171,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Returns false and sets 'status' to an error if an error is encountered. This odd
   /// return convention is used to avoid emitting unnecessary code for ~Status in perf-
   /// critical code.
-  bool AppendProbeRow(BufferedTupleStream* stream, TupleRow* row, Status* status);
+  bool AppendProbeRow(
+      BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
 
   /// Probes the hash table for rows matching the current probe row and appends
   /// all the matching build rows (with probe row) to output batch. Returns true
@@ -183,9 +184,9 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Using a separate variable is probably faster than calling
   /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory
load.
   bool inline ProcessProbeRowInnerJoin(
-      ScalarExprEvaluator* const* other_join_conjunct_evals,
-      int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
-      int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
+      ScalarExprEvaluator* const* other_join_conjunct_evals, int num_other_join_conjuncts,
+      ScalarExprEvaluator* const* conjunct_evals, int num_conjuncts,
+      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) WARN_UNUSED_RESULT;
 
   /// Probes and updates the hash table for the current probe row for either
   /// RIGHT_SEMI_JOIN or RIGHT_ANTI_JOIN. For RIGHT_SEMI_JOIN, all matching build
@@ -199,11 +200,11 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// the output batch. It's updated as rows are added to the output batch.
   /// Using a separate variable is probably faster than calling
   /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory
load.
-  template<int const JoinOp>
+  template <int const JoinOp>
   bool inline ProcessProbeRowRightSemiJoins(
-      ScalarExprEvaluator* const* other_join_conjunct_evals,
-      int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
-      int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
+      ScalarExprEvaluator* const* other_join_conjunct_evals, int num_other_join_conjuncts,
+      ScalarExprEvaluator* const* conjunct_evals, int num_conjuncts,
+      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) WARN_UNUSED_RESULT;
 
   /// Probes the hash table for the current probe row for LEFT_SEMI_JOIN,
   /// LEFT_ANTI_JOIN or NULL_AWARE_LEFT_ANTI_JOIN. The probe row will be appended
@@ -216,12 +217,12 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// the output batch. It's updated as rows are added to the output batch.
   /// Using a separate variable is probably faster than calling
   /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory
load.
-  template<int const JoinOp>
+  template <int const JoinOp>
   bool inline ProcessProbeRowLeftSemiJoins(
-      ScalarExprEvaluator* const* other_join_conjunct_evals,
-      int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
-      int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity,
-      Status* status);
+      ScalarExprEvaluator* const* other_join_conjunct_evals, int num_other_join_conjuncts,
+      ScalarExprEvaluator* const* conjunct_evals, int num_conjuncts,
+      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity,
+      Status* status) WARN_UNUSED_RESULT;
 
   /// Probes the hash table for the current probe row for LEFT_OUTER_JOIN,
   /// RIGHT_OUTER_JOIN or FULL_OUTER_JOIN. The matching build and/or probe row
@@ -235,21 +236,20 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Using a separate variable is probably faster than calling
   /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory
load.
   /// 'status' may be updated if appending to null aware BTS fails.
-  template<int const JoinOp>
+  template <int const JoinOp>
   bool inline ProcessProbeRowOuterJoins(
-      ScalarExprEvaluator* const* other_join_conjunct_evals,
-      int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
-      int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
+      ScalarExprEvaluator* const* other_join_conjunct_evals, int num_other_join_conjuncts,
+      ScalarExprEvaluator* const* conjunct_evals, int num_conjuncts,
+      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) WARN_UNUSED_RESULT;
 
   /// Probes 'current_probe_row_' against the the hash tables and append outputs
   /// to output batch. Wrapper around the join-type specific probe row functions
   /// declared above.
-  template<int const JoinOp>
-  bool inline ProcessProbeRow(
-      ScalarExprEvaluator* const* other_join_conjunct_evals,
+  template <int const JoinOp>
+  bool inline ProcessProbeRow(ScalarExprEvaluator* const* other_join_conjunct_evals,
       int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
       int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity,
-      Status* status);
+      Status* status) WARN_UNUSED_RESULT;
 
   /// Evaluates some number of rows in 'probe_batch_' against the probe expressions
   /// and hashes the results to 32-bit hash values. The evaluation results and the hash
@@ -265,10 +265,9 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// 'current_probe_row_' and 'hash_tbl_iterator_' have been set up to point to the
   /// next probe row and its corresponding partition. 'status' may be updated if
   /// append to the spilled partitions' BTS or null probe rows' BTS fail.
-  template<int const JoinOp>
-  bool inline NextProbeRow(
-      HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
-      int* remaining_capacity, Status* status);
+  template <int const JoinOp>
+  bool inline NextProbeRow(HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
+      int* remaining_capacity, Status* status) WARN_UNUSED_RESULT;
 
   /// Process probe rows from probe_batch_. Returns either if out_batch is full or
   /// probe_batch_ is entirely consumed.
@@ -287,14 +286,14 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// Used when NeedToProcessUnmatchedBuildRows() is true. Writes all unmatched rows from
   /// 'output_build_partitions_' to 'out_batch', up to 'out_batch' capacity.
-  Status OutputUnmatchedBuild(RowBatch* out_batch);
+  Status OutputUnmatchedBuild(RowBatch* out_batch) WARN_UNUSED_RESULT;
 
   /// Called by OutputUnmatchedBuild() when there isn't a hash table built, which happens
   /// when a spilled partition had 0 probe rows. In this case, all of the build rows are
   /// unmatched and we can iterate over the entire build side of the partition, which will
   /// be the only partition in 'output_build_partitions_'. If it reaches the end of the
   /// partition, it closes that partition and removes it from 'output_build_partitions_'.
-  Status OutputAllBuild(RowBatch* out_batch);
+  Status OutputAllBuild(RowBatch* out_batch) WARN_UNUSED_RESULT;
 
   /// Called by OutputUnmatchedBuild when there is a hash table built. Sweeps the
   /// 'hash_tbl_' of the partition that is at the front of 'output_build_partitions_',
@@ -302,7 +301,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// end of the hash table it closes that partition, removes it from
   /// 'output_build_partitions_' and moves 'hash_tbl_iterator_' to the beginning of the
   /// new partition at the front of 'output_build_partitions_'.
-  Status OutputUnmatchedBuildFromHashTable(RowBatch* out_batch);
+  void OutputUnmatchedBuildFromHashTable(RowBatch* out_batch);
 
   /// Writes 'build_row' to 'out_batch' at the position of 'out_batch_iterator' in a
   /// 'join_op_' specific way.
@@ -310,31 +309,33 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
       RowBatch* out_batch, TupleRow* build_row, RowBatch::Iterator* out_batch_iterator);
 
   /// Initializes 'null_aware_probe_partition_' and prepares its probe stream for writing.
-  Status InitNullAwareProbePartition();
+  Status InitNullAwareProbePartition() WARN_UNUSED_RESULT;
 
   /// Initializes 'null_probe_rows_' and prepares that stream for writing.
-  Status InitNullProbeRows();
+  Status InitNullProbeRows() WARN_UNUSED_RESULT;
 
   /// Initializes null_aware_partition_ and nulls_build_batch_ to output rows.
-  Status PrepareNullAwarePartition();
+  Status PrepareNullAwarePartition() WARN_UNUSED_RESULT;
 
   /// Continues processing from null_aware_partition_. Called after we have finished
   /// processing all build and probe input (including repartitioning them).
-  Status OutputNullAwareProbeRows(RuntimeState* state, RowBatch* out_batch);
+  Status OutputNullAwareProbeRows(
+      RuntimeState* state, RowBatch* out_batch) WARN_UNUSED_RESULT;
 
   /// Evaluates all other_join_conjuncts against null_probe_rows_ with all the
   /// rows in build. This updates matched_null_probe_, short-circuiting if one of the
   /// conjuncts pass (i.e. there is a match).
   /// This is used for NAAJ, when there are NULL probe rows.
-  Status EvaluateNullProbe(BufferedTupleStream* build);
+  Status EvaluateNullProbe(BufferedTupleStream* build) WARN_UNUSED_RESULT;
 
   /// Prepares to output NULLs on the probe side for NAAJ. Before calling this,
   /// matched_null_probe_ should have been fully evaluated.
-  Status PrepareNullAwareNullProbe();
+  Status PrepareNullAwareNullProbe() WARN_UNUSED_RESULT;
 
   /// Outputs NULLs on the probe side, returning rows where matched_null_probe_[i] is
   /// false. Used for NAAJ.
-  Status OutputNullAwareNullProbe(RuntimeState* state, RowBatch* out_batch);
+  Status OutputNullAwareNullProbe(
+      RuntimeState* state, RowBatch* out_batch) WARN_UNUSED_RESULT;
 
   /// Call at the end of consuming the probe rows. Cleans up the build and probe hash
   /// partitions and:
@@ -345,15 +346,16 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   ///    unmatched rows.
   ///  - If the build partition did not have a hash table, meaning both build and probe
   ///    rows were spilled, move the partition to 'spilled_partitions_'.
-  Status CleanUpHashPartitions(RowBatch* batch);
+  Status CleanUpHashPartitions(RowBatch* batch) WARN_UNUSED_RESULT;
 
   /// Get the next row batch from the probe (left) side (child(0)). If we are done
   /// consuming the input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0.
-  Status NextProbeRowBatch(RuntimeState* state, RowBatch* out_batch);
+  Status NextProbeRowBatch(RuntimeState* state, RowBatch* out_batch) WARN_UNUSED_RESULT;
 
   /// Get the next probe row batch from 'input_partition_'. If we are done consuming the
   /// input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0.
-  Status NextSpilledProbeRowBatch(RuntimeState* state, RowBatch* out_batch);
+  Status NextSpilledProbeRowBatch(
+      RuntimeState* state, RowBatch* out_batch) WARN_UNUSED_RESULT;
 
   /// Moves onto the next spilled partition and initializes 'input_partition_'. This
   /// function processes the entire build side of 'input_partition_' and when this
@@ -364,7 +366,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// 'builder->hash_partitions_' and prepare for repartitioning the partition's probe
   /// rows. If there are no probe rows, we just prepare the build side to be read by
   /// OutputUnmatchedBuild().
-  Status PrepareSpilledPartitionForProbe(RuntimeState* state, bool* got_partition);
+  Status PrepareSpilledPartitionForProbe(
+      RuntimeState* state, bool* got_partition) WARN_UNUSED_RESULT;
 
   /// Calls Close() on every probe partition, destroys the partitions and cleans up any
   /// references to the partitions. Also closes and destroys 'null_probe_rows_'.
@@ -374,12 +377,13 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   void ResetForProbe();
 
   /// Codegen function to create output row. Assumes that the probe row is non-NULL.
-  Status CodegenCreateOutputRow(LlvmCodeGen* codegen, llvm::Function** fn);
+  Status CodegenCreateOutputRow(
+      LlvmCodeGen* codegen, llvm::Function** fn) WARN_UNUSED_RESULT;
 
   /// Codegen processing probe batches.  Identical signature to ProcessProbeBatch.
   /// Returns non-OK if codegen was not possible.
   Status CodegenProcessProbeBatch(
-      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode);
+      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
 
   /// Returns the current state of the partition as a string.
   std::string PrintState() const;
@@ -508,7 +512,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
     /// not fail with out of memory if this succeeds. Returns an error if the first read
     /// block cannot be acquired. "delete_on_read" mode is used, so the blocks backing
     /// the buffered tuple stream will be destroyed after reading.
-    Status PrepareForRead();
+    Status PrepareForRead() WARN_UNUSED_RESULT;
 
     /// Close the partition and attach resources to 'batch' if non-NULL or free the
     /// resources if 'batch' is NULL. Idempotent.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fae36fc7/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index aa524a3..a08de58 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -664,7 +664,7 @@ row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
 ====
 ---- QUERY
 # spilled partition with 0 probe rows, RIGHT OUTER JOIN
-set max_block_mgr_memory=100m;
+set max_block_mgr_memory=10m;
 select straight_join count(*)
 from
 supplier right outer join lineitem on s_suppkey = l_suppkey
@@ -691,7 +691,7 @@ BIGINT
 row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
 ====
 ---- QUERY
-# IMPALA-5171: spilling hash join feeding into right side of nested loop join.
+# IMPALA-5173: spilling hash join feeding into right side of nested loop join.
 # Equivalent to:
 #   select *
 #   from lineitem


Mime
View raw message