beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Eugene Kirpichov (JIRA)" <>
Subject [jira] [Commented] (BEAM-217) BoundedSource.splitAtFraction should be splitAfterFraction
Date Wed, 26 Apr 2017 21:38:04 GMT


Eugene Kirpichov commented on BEAM-217:

I don't think so. It hasn't come up as a problem in practice, so it seems acceptable to keep
BoundedSource as-is, though of course we should do things the right way in SDF (and the SDF
design does this the right way).

> BoundedSource.splitAtFraction should be splitAfterFraction
> ----------------------------------------------------------
>                 Key: BEAM-217
>                 URL:
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Eugene Kirpichov
>            Assignee: Eugene Kirpichov
>            Priority: Minor
>              Labels: backward-incompatible
>             Fix For: Not applicable
> Dynamic work rebalancing works by 1) determining how long the bundle should take in order
to not be a straggler - the "deadline", 2) predicting where the bundle will be (position or
fraction) by that deadline, and 3) requesting an atomic split (splitAtFraction).
> Currently all BoundedSource's and (in Dataflow runner) NativeReaderIterator's refuse
splits if they have already consumed the requested split position.
> Splitting a task [A, C) at position B generates [A, B) and [B, C), so if we predict that
by deadline the task will have last consumed position X, we should split not "at" X, but "after"
X (i.e. at next(X)) - i.e. into [A, X] (because X is already consumed) and (X, C) equivalently
[A, next(X)) and [next(X), C).
> One way to fit this into the current BoundedSource API is to rename splitAtFraction to
splitAfterFraction and adjust the documentation. Documentation of getFractionConsumed also
needs to be clarified to emphasize that it should return what fraction of all positions in
the source have already been consumed, including the position of the last consumed record.
For example, for an index-range task with range [0, 5), after it has read the first record
at position 0, it has consumed 20%, rather than 0% (and of course not 40% even if an internal
"next index" variable is now 1 - this mistake is especially easy to make in a file-based source
if you base the calculations on the file's offset *after* consuming the record - the correct
way is to calculate based on offsets of beginning of records).

This message was sent by Atlassian JIRA

View raw message