spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "David McLennan (JIRA)" <>
Subject [jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
Date Wed, 13 Jun 2018 22:48:00 GMT


David McLennan commented on SPARK-19609:

This feature would be extremely useful in making external data lookups more efficient.  For
example, you have a stream of data coming in with a window of 10,000 messages.  You need
to join each message with reference data on external services to enrich it (for example accounts
and products).  Today, you would either have to pull the entire external data sources into
the executors (expensive on all sides - even the small datasets are many 10's of gigabyres),
or lookup the external datasets key by key on a per message basis, which is very chatty from
a communication perspective.  If this feature is implemented, it could reduce the amount
of data transfer significantly, if the cardinality of the join keys is low (i.e. you might
have 10,000 messages, but they reference only 15 unique accounts and 50 unique products.) 
It would also relieve the author of the burden of having to implement something which does
this themselves - they could just register the dataframes, run a sql context ontop of it,
and go.

> Broadcast joins should pushdown join constraints as Filter to the larger relation
> ---------------------------------------------------------------------------------
>                 Key: SPARK-19609
>                 URL:
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: Nick Dimiduk
>            Priority: Major
> For broadcast inner-joins, where the smaller relation is known to be small enough to
materialize on a worker, the set of values for all join columns is known and fits in memory.
Spark should translate these values into a {{Filter}} pushed down to the datasource. The common
join condition of equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause.
An example of pushing such filters is already present in the form of {{IsNotNull}} filters
via [~sameerag]'s work on SPARK-12957 subtasks.
> This optimization could even work when the smaller relation does not fit entirely in
memory. This could be done by partitioning the smaller relation into N pieces, applying this
predicate pushdown for each piece, and unioning the results.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message