spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Li Jin (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22947) SPIP: as-of join in Spark SQL
Date Thu, 11 Jan 2018 19:48:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16322847#comment-16322847
] 

Li Jin commented on SPARK-22947:
--------------------------------

I have looked at SPARK-8682 and tried to figured out general way to optimize range join. However,
I think it's quite hard to optimize for general case of range join.

Even consider the simplest form:
{code:java}
dfA.join(dfB, dfA.time < dfB.time)
{code}
For each partition of dfA [b, e) (range partition on time), we need to join it with all rows
of dfB in range [b, +inf), because the predicate is unbounded. This would result in similar
complexity as a cartesian product.

However, in time series analysis, people usually wants to join points in time that are "close
to each other". This means, the join predicate often looks like:

{code:java}
dfA.join(dfB, dfB.time - a < dfA.time < dfB.time + b)
{code}

This is bounded range predicate on a single attribute. And I think this kind of join can be
optimized, described in "Optional Design Sketch:
Implementation A"

Then there is the second step, which is what to do with the matching rows. In the case of
"as-of" join, the second step is "pick the closest row". In other operations, the second step
can be "compute average" or "return all matched rows" or "return the first/last x rows". But
the second step is just a local operation after potential matched data are in the same partition.

This is a very common pattern in the time series analysis that we are doing and as far as
I can tell, there is no good way to describe this analysis pattern in the current Spark SQL
API. But I think the execution engine of Spark SQL can execute such pattern efficiently, also
Catalyst can optimize such operations quite well (for instance, it's not hard for Catalyst
to optimize an as-of join on "time" followed by groupby on "time" to avoid the shuffle step
in groupby)

Given how useful and fundamental this pattern is in time series analysis, I think it will
be a good win for Spark if such pattern is supported.




> SPIP: as-of join in Spark SQL
> -----------------------------
>
>                 Key: SPARK-22947
>                 URL: https://issues.apache.org/jira/browse/SPARK-22947
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.2.1
>            Reporter: Li Jin
>         Attachments: SPIP_ as-of join in Spark SQL (1).pdf
>
>
> h2. Background and Motivation
> Time series analysis is one of the most common analysis on financial data. In time series
analysis, as-of join is a very common operation. Supporting as-of join in Spark SQL will allow
many use cases of using Spark SQL for time series analysis.
> As-of join is “join on time” with inexact time matching criteria. Various library
has implemented asof join or similar functionality:
> Kdb: https://code.kx.com/wiki/Reference/aj
> Pandas: http://pandas.pydata.org/pandas-docs/version/0.19.0/merging.html#merging-merge-asof
> R: This functionality is called “Last Observation Carried Forward”
> https://www.rdocumentation.org/packages/zoo/versions/1.8-0/topics/na.locf
> JuliaDB: http://juliadb.org/latest/api/joins.html#IndexedTables.asofjoin
> Flint: https://github.com/twosigma/flint#temporal-join-functions
> This proposal advocates introducing new API in Spark SQL to support as-of join.
> h2. Target Personas
> Data scientists, data engineers
> h2. Goals
> * New API in Spark SQL that allows as-of join
> * As-of join of multiple table (>2) should be performant, because it’s very common
that users need to join multiple data sources together for further analysis.
> * Define Distribution, Partitioning and shuffle strategy for ordered time series data
> h2. Non-Goals
> These are out of scope for the existing SPIP, should be considered in future SPIP as
improvement to Spark’s time series analysis ability:
> * Utilize partition information from data source, i.e, begin/end of each partition to
reduce sorting/shuffling
> * Define API for user to implement asof join time spec in business calendar (i.e. lookback
one business day, this is very common in financial data analysis because of market calendars)
> * Support broadcast join
> h2. Proposed API Changes
> h3. TimeContext
> TimeContext is an object that defines the time scope of the analysis, it has begin time
(inclusive) and end time (exclusive). User should be able to change the time scope of the
analysis (i.e, from one month to five year) by just changing the TimeContext. 
> To Spark engine, TimeContext is a hint that:
> can be used to repartition data for join
> serve as a predicate that can be pushed down to storage layer
> Time context is similar to filtering time by begin/end, the main difference is that time
context can be expanded based on the operation taken (see example in as-of join).
> Time context example:
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> {code}
> h3. asofJoin
> h4. User Case A (join without key)
> Join two DataFrames on time, with one day lookback:
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> dfA = ...
> dfB = ...
> JoinSpec joinSpec = JoinSpec(timeContext).on("time").tolerance("-1day")
> result = dfA.asofJoin(dfB, joinSpec)
> {code}
> Example input/output:
> {code:java}
> dfA:
> time, quantity
> 20160101, 100
> 20160102, 50
> 20160104, -50
> 20160105, 100
> dfB:
> time, price
> 20151231, 100.0
> 20160104, 105.0
> 20160105, 102.0
> output:
> time, quantity, price
> 20160101, 100, 100.0
> 20160102, 50, null
> 20160104, -50, 105.0
> 20160105, 100, 102.0
> {code}
> Note row (20160101, 100) of dfA is joined with (20151231, 100.0) of dfB. This is an important
illustration of the time context - it is able to expand the context to 20151231 on dfB because
of the 1 day lookback.
> h4. Use Case B (join with key)
> To join on time and another key (for instance, id), we use “by” to specify the key.
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> dfA = ...
> dfB = ...
> JoinSpec joinSpec = JoinSpec(timeContext).on("time").by("id").tolerance("-1day")
> result = dfA.asofJoin(dfB, joinSpec)
> {code}
> Example input/output:
> {code:java}
> dfA:
> time, id, quantity
> 20160101, 1, 100
> 20160101, 2, 50
> 20160102, 1, -50
> 20160102, 2, 50
> dfB:
> time, id, price
> 20151231, 1, 100.0
> 20150102, 1, 105.0
> 20150102, 2, 195.0
> Output:
> time, id, quantity, price
> 20160101, 1, 100, 100.0
> 20160101, 2, 50, null
> 20160102, 1, -50, 105.0
> 20160102, 2, 50, 195.0
> {code}
> h2. Optional Design Sketch
> h3. Implementation A
> (This is just initial thought of how to implement this)
> (1) Using begin/end of the TimeContext, we first partition the left DataFrame intonon-overlapping
partitions. For the purpose of demonstration, assume we partition it into one-day partitions:
> {code:java}
> [20160101, 20160102) [20160102, 20160103) ... [20161231, 20170101)
> {code}
> (2) Then we partition right DataFrame into overlapping partitions, taking into account
tolerance, e.g. one day lookback:
> {code:java}
> [20151231, 20160102) [20160101, 20160103) ... [20161230, 20170101)
> {code}
> (3) Pair left and right partitions
> (4) For each pair of partitions, because all data for the join is in the partition pair,
we can now join the partition pair locally.
> (5) Use partitioning in (1) as the output distribution so we can reuse it for sequential
asof joins 
> h2. Optional Rejected Sketch
> h3. Rejected Implementation A
> Another implementation is to sample the data to figure out its time range instead of
using a time context. This approach is implemented in https://github.com/twosigma/flint
> This approach suffers in performance if sampling data is expensive. For instance, when
the data to be sampled is the output of an expensive computation, sampling the data would
cause the expensive computation to be done twice.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message