impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tim Armstrong (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder
Date Thu, 15 Sep 2016 00:24:14 GMT
Tim Armstrong has posted comments on this change.

Change subject: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder

Patch Set 14:

File be/src/exec/

Line 207:         state, Substitute(PREPARE_FAILED_ERROR_MSG, "write"));
> old code and agg uses state_->block_mgr()->MemLimitTooLowError().  Any reas
Historical reasons it looks like:  the PrepareForRead() version below diverged from the PAGG/PHJ

Switched to the block_mgr version for consistency.
File be/src/exec/

PS14, Line 297: SCOPED_TIMER(build_timer_);
> We used to have two different timers in PHJ for measuring the time to hash 
It's not clear to me that the original timers were well thought out. It didn't look like there
was a timer for how long it took to build the hash table - that was lumped into 'build_timer_'.

I added two new timers with clearer meanings that track the total hash table build timer and
time spent partitioning rows. Also moved the remaining build-related timers into the Builder.

Now 'built_timer_' is the initial partitioning and hash table build, and 'repartition_timer_'
is the time spent repartitioning'.
File be/src/exec/

Line 179:     largest_fraction = max(largest_fraction,
> DCHECK_GT(num_build_rows, 0);
I don't think that's valid, it's casting to double to avoid crashing on the divide-by-zero.
I changed it to avoid the calculation if num_build_rows = 0 to avoid that problem.

PS14, Line 318: a hash table
> hash tables
Done. Both alternatives seem grammatically correct to me though.

Line 341:   // building hash tables to avoid building hash tables for partitions we'll spill
> That's an interesting comment. If I understand it correctly, it means that 
Done with some slight tweaks.

This is a lot simpler than the previous behaviour, where spilling can happen even during probing.

I didn't measure how often, but it's not at all improbable, since the probe buffers are 8MB
each and we could allocate up to 16 of them.

Line 401:       RETURN_IF_ERROR(SpillPartition());
> May help to document that failure to find any partition to spill (e.g. all 
I added to the comment up the top of the loop to make the termination more explicit.

PS14, Line 529: PhjBuilder::HashTableStoresNulls()
> This seems to belong to PartitionedHashJoinNode conceptually.
I feel like it makes sense here though since the builder owns the hash tables.

Plumbing-wise it's also easier since PhjBuilder needs this info and starts executing before

Line 646:   do {
> Please see comments in BlockingJoinNode,  it would be great to retain timer

PS14, Line 782: process_build_batch_fn_level0
> 'process_build_batch_fn_level0_'
I just copied this verbatim - it seems to be referring to the local variable though.
File be/src/exec/partitioned-hash-join-builder.h:

Line 425: 
> nit: unnecessary blank line ?
File be/src/exec/

PS14, Line 151: 
> This may be important information to retain.
Not sure why I removed it. May have had a good reason but I can't recall it.

PS14, Line 87: Expr::CreateExprTree(pool_, eq_join_conjunct.right
> Does this overlap with CreateExprTree() in PhjBuilder::Init() ? Same questi
They are redundant currently, but the idea is that PhjBuilder and PhjNode will exist more
independently in different plan fragments, so I wanted to encapsulate the state where possible.

The expr contexts need to be independent to eventually support many probe sides : 1 build
side for broadcast joins.

I think documenting this explicitly may be confusing because it is sort-of explaining the
current state of things relative to the previous way of doing things. Whereas I think with
the separate build side, the default assumption is that builder data structures are private
and not shared with the exec node.

It may make sense to share Expr trees globally between fragment instances as part of the multithreading
changes, but I don't think it's worth trying to share them here until we have the right infrastructure.

Line 213:   for (unique_ptr<ProbePartition>& partition : spilled_partitions_)
> missing { }
Ah, missed that clang-format wrapped it.

PS14, Line 494: to output
> outputting

PS14, Line 585: hash_partitions_
> 'hash_partitions_'

Line 594:       continue;
> Is there a reason why we cannot call OutputUnmatchedBuild() directly at thi
I don't see any obvious reason why not. I think we could do a lot of simplification of this
loop in general.

PS14, Line 996: s)
> next_state ?
File be/src/exec/partitioned-hash-join-node.h:

PS14, Line 437: 
> Is this merged into build_timer_ in BlockingJoinNode now ? It may be helpfu
I recreated this timer with a clearer name. I'm not convinced the original set of timers were
particularly useful for fine-grained debugging, but I think the new set should be slightly

It's kind of tricky because you can slice it up in lots of different ways: different phases
of the join algorithm x partitioning/building hash tables.

Line 79:   /// Logically, the algorithm has the following modes:.
> I think this comment could be a lot more useful if we explain how the phase
I think the comment was meant to be read after the class comment. I enhanced the class comment
and moved this part up next to it.

PS14, Line 81: partition them into hash_partitions_. The input can 
> this comment is kind of disjoint since it talks about what the phase does, 
I reworked it a bit and described the state transitions at the end of each description.

PS14, Line 81: hash_partitions_
> 'hash_partitions_'

PS14, Line 84: This is the only phase 
> These are the only phases

> why do we use PROCESSING term here rather than PARTITIONING, which would be
I changed it. I think if all hash tables are in memory we're arguably not partitioning the
probe since all we're doing is looking up hash tables. But by the same argument we shouldn't
call it REPARTITIONING_PROBE. So it seems better to be consistent but arguably slightly. inaccurate.

PS14, Line 87: spilled
> single spilled
Reworded it to be clearer.

PS14, Line 88: probe_hash_partitions_
> 'probe_hash_partitions_'

Line 89:   ///      table in memory, we perform the join, otherwise we spill the probe row.
> this last sentence is confusing since it mostly restates what the first say
Done - really reworded this completely.

PS14, Line 90: construct
> what does this phase construct? isn't the hash table already constructed?
Yeah, the hash table construction is essentially done as part of deciding whether to transition
into this or REPARTITIONING_BUILD. should be more explicit.

> it would be good to be more upfront about when this phase is used and how, 
I think this should be a lot clearer - it's described in the initial high level algorithm
and also in the state transitions.

PS14, Line 91: walking
> what does this mean? processing? 
I got rid of all instances of 'walk' in the header comments.

Hopefully the transitions will be clearer in the new version of this comment.

PS14, Line 97: *
> what does this star mean to say? We can go from REPARTITIONING_PROBE back t
Yeah, it made sense to me as a regular expression. I got rid of this since the transitions
are documented elsewhere.

PS14, Line 112: input_partition_
> 'input_partition_'

PS14, Line 118: input_partition_
> 'input_partition_'

PS14, Line 143: buffer
> write buffer?

PS14, Line 264: the
> its

PS14, Line 267: the
> that

PS14, Line 291: Walks
> What does this mean. Iterates over?
Yeah, not my wording - rephrased it.

PS14, Line 291: hash partitions
> is this talking about the probe hash partitions or the builder's?
Both in sync. Updated.

PS14, Line 298: spilled_partitions_
> 'spilled_partitions_'

PS14, Line 302: probe_batch_pos_
> 'probe_batch_pos_'

PS14, Line 306: probe_batch_pos_
> 'probe_batch_pos_'

PS14, Line 309: input_partition_
> 'input_partition_'

PS14, Line 384: builder_->hash_partitions
> is this referring to the entries in the builder_->hash_partition_ array?
Tried to clarify a bit.

PS14, Line 385: This is not used when processing a single partition.
> what does this mean?
Clarified in terms of the states defined above.. When we're processing a single spilled partition
we don't need to repartition the probe.

PS14, Line 408: null_aware_
> 'null_aware_'

PS14, Line 410:  we then iterate over all the probe rows
> I can't tell if this means literally all the probe rows, or all the rows in
Clarified that it's the partition's. Most of the text was just copied verbatim from the original
comment - I didn't want to start rewriting all the comments and making this patch even larger.

PS14, Line 432: and
> this makes it sound like there are two conditions required to create a prob

PS14, Line 436: preallocated
> what does this mean?
reworded to clarify

PS14, Line 437: and
> double and.  And is this saying the constructor prepares it or the caller s

PS14, Line 443: should
> will

Line 445:     /// block cannot be acquired. "delete_on_read" mode is used, so blocks in the
> ... used for the buffered tuple stream, so..
reworded in a slightly different way.

PS14, Line 464: meaning
              :     /// it has to call Close() on it) but transferred to the parent exec node
> this doesn't seem right. don't we either Close() it or transfer it (not bot
I interpreted the comment as meaning that it has to call Close() on it unless it transfers
ownership. I just removed the parenthetical statement since it's more confusing than clarifying.
File be/src/util/

PS14, Line 409: DCHECK
Removed this line as part of refactoring.

Line 412:   children_.insert(children_.begin(), child_entry);
> rather than having two special cases (here and line 395-399, how about maki

To view, visit
To unsubscribe, visit

Gerrit-MessageType: comment
Gerrit-Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb
Gerrit-PatchSet: 14
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Tim Armstrong <>
Gerrit-Reviewer: Alex Behm <>
Gerrit-Reviewer: Dan Hecht <>
Gerrit-Reviewer: Michael Ho
Gerrit-Reviewer: Michael Ho <>
Gerrit-Reviewer: Tim Armstrong <>
Gerrit-HasComments: Yes

View raw message