hive-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gopal Vijayaraghavan <gop...@apache.org>
Subject Re: Hash table in map join - Hive
Date Thu, 30 Jun 2016 16:11:16 GMT

> 1. In the query plan, it still says Map Join Operator (Would have
>expected it to be named as Reduce side operator).

The "Map" in that case refers really to Map<K,V> rather the hadoop
version. An unambigous name is if it were called the HashJoinOperator.

This is one of the optimizations of Tez right now that a map-join can be
inserted in any vertex, because "Map 1" is just really in the name (it is
a vertex).

Also, even if the input format was Text/Sequencefile, the reduce
vectorization can vectorize the simple join cases because it is not tied
to the inputformat anymore.

> 2. The edges in this query plans were named as custom_simple_edge: Is
>this the one pointing to the fact that sorting of mapper inputs are
>bypassed? 

Not directly related, but the custom edges do their own edgemanager - the
edgemanager that is there can possibly be replaced with a simple edge +
unsorted input-output pairs since tez-0.5.x.

But the edge has an extension which can do some non-simple things too,
which is why Tez supports edge overrides like this.

 <http://www.slideshare.net/Hadoop_Summit/w-235phall1pandey/13>


> 3. "hive.tez.auto.reducer.parallelism=true" is not taking effect for
>shuffle hash join.

That issue was already reported by Twitter, the unsorted edges do not send
out the output size bitsets.

 <https://issues.apache.org/jira/browse/TEZ-3287>


> 1. What does tez.auto.reducer.parallelism do -- Does it only reduce the
>number of reducers based on the actual size of mapper output, or does it
>do more.

It does a bit more when PipelineSorter is enabled.

The sorted edges actually partition-first and sort-then. So the sort-key
is actually (reducer-n, key) & the first few bytes of that information is
stored into metadata region of the sorter for better L1 cache hit-rate
when sorting.

So the more reducers there are, the faster it sorts. However, it
compresses each reducer output independently, so slicing too thin produces
bad network overheads.

Auto-reducer parallelism exists so that you don't need to tune each query
by hand to fit those trade-offs.

> 2. I did not understand the intuition behind setting
>hive.mapjoin.hybridgrace.hashtable=false (as mentioned in your previous
>reply).

Yes, it is the same impl from the wiki. But the grace hashjoin drops the
hashtable if it spills between executions of the same vertex.

The regular hashJoin does not reload the hashtable when the split changes,
this means the grace hashjoin can take 4-5x more time than the optimized
one.

The time it takes to load the hashtable goes up, while the lookups aren't
much different because the grace hash-join has a bloom filter on top of it.

If you have 35,000 splits and 800 containers, the hash-build times adds up
pretty quickly.

> 3. In general, map join  in cluster mode, are these the actual steps
>followed in hive/tez:
> a. Hash table generation:  Partitioned hash tables of the small table is
>created across multiple containers. In each container, a part of the
>small table is dealt with. And in each container, the hash table is built

No, broadcast tasks generate merely produces an unordered output - it is
not a hashtable.

This is done in parallel as you describe across multiple containers & on
the cluster (tries for locality next to the small tables).

> b. Broadcast of hash table: All the partitions of all the parts of mall
>table, including the ones spilled in the disk are serialized and sent to
>all the second map containers.

The broadcast is done via shuffle, same as sorted data movement, but one
which reads the unordered streams and builds a hashtable inside every
JoinOperator.

The hashtable is then put into a cache in the task which has scope of the
Vertex - if the same vertex re-runs on the same container, it will reload
from the cache instead of the shuffle stream.

The grace hashtable throws away in-mem data when it reloads a spilled
fraction of the hashtable, so the moment it has spilled it is no longer
considered for reuse.

> Where does the rebuilding of spilt hash table happen? Is it during
>second map phase where join is happening with bigger table?

The split-hashtable looks exactly like the regular hashtable, but it has 3
return values for the data - Yes, No, Ask-Later.

So other than the handling of the Ask-Later scenario, the split-hashtable
looks exactly like the full in-mem one.

> c. Join operator:  The big table is scanned in each second mapper,
>against the entire hash table of small table, and result is got.

Yes.

Hadoop Summit slides from 2014, in the slides above are a little out of
date, but they cover some of the basics related to how this all fits
together.

Cheers,
Gopal



Mime
View raw message