spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jungtaek Lim (JIRA)" <>
Subject [jira] [Commented] (SPARK-10816) EventTime based sessionization
Date Wed, 17 Oct 2018 04:18:00 GMT


Jungtaek Lim commented on SPARK-10816:

I've been thinking about requirements on state store for session window, and it looks like
hard to optimize (it has some tradeoffs to choose, for sure).

Two major factors here:

A. Update
 * New session(s) can be placed "before" previous sessions.
 * New session(s) can be placed between two previous sessions.
 * Multiple previous sessions can be merged into one due to new event(s).
 * New sessions(s) can be placed "after" previous sessions.
 * Any values including session start timestamp and session end timestamp can be changed.

B. Evict
 * It should iterate all keys, instead of keys which input rows in current batch have.
 * If the sessions are sorted (if sessions are sorted by "session start", it is also sorted
by "session end" based on the concept of "session"), eviction normally happens from start
and we can evict sessions for given key unless we find session which doesn't meet condition
to evict.
 ** This would help when each key has numerous sessions, though in practice I wouldn't think
a key will have numerous sessions.

Multiple facts on above would require shift on array-like structure, especially making most
of elements (sometimes all) being shifted on left and right. We are not only concerning overall
shape of state, but also concerning delta for every operation instead of delta of two snapshots,
which sometimes requires different strategy.

So ideally applying diff between old sessions and new sessions to state would minimize delta,
and less optimal approach is overwrite previous sessions with new sessions (with consideration
on reducing delta).


Based on above facts, I feel choosing (key, list of sessions) as state data structure (Baidu's
approach) is considerable compared to choosing (key, count of sessions) & (key + index,
session) (streaming join), because of:
 * For latter approach, the key part is always duplicated, hence written to delta for N times
when N elements are changed. If the count is changed, N + 1.
 * For latter approach, to optimize removal of element, it doesn't respect the insertion order,
whereas most of optimization on applying new sessions are based on the fact that previous
sessions are sorted.
 ** This requires additional sort, as well as making apply new sessions into state be complicated.
 *** This is the reason HWX's patch just removes all and append all, and leave addressing
it to TODO.
 ** This optimization avoids shifting which is non-trivial operation hence it is a "trade-off".
 ** While evicting it should traverse all the sessions since it is not sorted.
 * For former approach, the major issue is that all sessions in given key should be written
to delta. Hopefully here the key part is occurred only once.


Would like to hear others thoughts/opinions about this.


> EventTime based sessionization
> ------------------------------
>                 Key: SPARK-10816
>                 URL:
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>            Reporter: Reynold Xin
>            Priority: Major
>         Attachments: SPARK-10816 Support session window natively.pdf, Session Window
Support For Structure Streaming.pdf

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message