beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Eugene Kirpichov (JIRA)" <>
Subject [jira] [Created] (BEAM-217) BoundedSource.splitAtFraction should be splitAfterFraction
Date Thu, 21 Apr 2016 15:13:25 GMT
Eugene Kirpichov created BEAM-217:

             Summary: BoundedSource.splitAtFraction should be splitAfterFraction
                 Key: BEAM-217
             Project: Beam
          Issue Type: Improvement
          Components: sdk-java-core
            Reporter: Eugene Kirpichov
            Assignee: Davor Bonaci

Dynamic work rebalancing works by 1) determining how long the bundle should take in order
to not be a straggler - the "deadline", 2) predicting where the bundle will be (position or
fraction) by that deadline, and 3) requesting an atomic split (splitAtFraction).

Currently all BoundedSource's and (in Dataflow runner) NativeReaderIterator's refuse splits
if they have already consumed the requested split position.

Splitting a task [A, C) at position B generates [A, B) and [B, C), so if we predict that by
deadline the task will have last consumed position X, we should split not "at" X, but "after"
X (i.e. at next(X)) - i.e. into [A, X] (because X is already consumed) and (X, C) equivalently
[A, next(X)) and [next(X), C).

One way to fit this into the current BoundedSource API is to rename splitAtFraction to splitAfterFraction
and adjust the documentation. Documentation of getFractionConsumed also needs to be clarified
to emphasize that it should return what fraction of all positions in the source have already
been consumed, including the position of the last consumed record. For example, for an index-range
task with range [0, 5), after it has read the first record at position 0, it has consumed
20%, rather than 0% (and of course not 40% even if an internal "next index" variable is now
1 - this mistake is especially easy to make in a file-based source if you base the calculations
on the file's offset *after* consuming the record - the correct way is to calculate based
on offsets of beginning of records).

This message was sent by Atlassian JIRA

View raw message