spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jungtaek Lim (JIRA)" <>
Subject [jira] [Commented] (SPARK-10816) EventTime based sessionization
Date Mon, 22 Oct 2018 05:51:00 GMT


Jungtaek Lim commented on SPARK-10816:

Just going back to review the origin comment of [~zsxwing]. I guess it is a state-oriented
view and various approaches are still available on how to leverage the format of state.
(For example, one big physical exec vs smaller physical execs with leveraging existing physical

For now let's concentrate on pros and cons of state format.

[1] is pretty clear that it works, and very simple to implement based on this. But it is also
pretty clear it requires loading all the sessions for given key in memory, as well as requires
shifting left and right on array. It may require search whether the session is updated or
new. It also always overwrites the value of given key, hence all the sessions for given key
will be written to delta once there's any change on the key.

[2] would help to overwrite updated sessions if we assume start timestamp of session is unchanged,
which would require some overheads on [1] and [3], seeking and writing. Unfortunately the
assumption is not correct: start timestamp of session can be modified as well via late event,
so we need to handle such corner case, which means we still need to traverse sessions in given
key which would be hugely inefficient. If key is not sorted, it requires full traverse of
key space.

[3] is the thing which let us play with trade-off. How it works is basically similar with
[1], since it simulates indexable array. It has some more overheads to manipulate two states,
but it doesn't require loading all of sessions in given key to memory. Moreover, it can be
tweaked to reduce delta on removal of elements, which has possible downside - broken of order
- which we should play with trade-off.
(One idea of addressing both order of elements and huge delta on shifting is placing tombstones
instead of removing elements, and periodically removing tombstones and shifting - maybe during
taking snapshot if possible?)

I guess it is unlikely to have plenty of valid sessions in given key at the specific time,
but it's just me, so [3] may be preferred way to go. My patch leverages [3] but due to out
of order after removal on [3], memory issue still exists as same as [1], which I'll try to
address without introducing too huge overhead.

Please let me know above analysis would be good to be included in SPIP or detailed design

Btw, we may also would like to talk about how to add complexity: one big physical exec vs
smaller physical execs with leveraging existing physical execs. Both mine and Baidu's patches
are taking latter approach, but I can try to make changes if Spark community prefers former
to achieve high cohesion to specific exec instead of injecting new execs into multiple places.

> EventTime based sessionization
> ------------------------------
>                 Key: SPARK-10816
>                 URL:
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>            Reporter: Reynold Xin
>            Priority: Major
>         Attachments: SPARK-10816 Support session window natively.pdf, Session Window
Support For Structure Streaming.pdf

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message