beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kenneth Knowles (JIRA)" <>
Subject [jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner
Date Fri, 10 Mar 2017 22:25:04 GMT


Kenneth Knowles commented on BEAM-1612:

I just noticed this, and I agree strongly that it needs to be solved. It is a major concern
for all runners to get this right in order to have realistic performance.

There are some questions surrounding when and how to output data from {{@FinishBundle}} since
it generally doesn't work with windowing. Bundles are not related to a window and can have
data from lots of windows. I filed BEAM-1283 because I think the spec is very bad, and [~tgroh]
filed BEAM-1312 with an even stronger viewpoint that there should be no output, only flush-like
operations. But sometimes there may be data that comes back from a flush that you need to
output - it should be mostly deterministic and independent of bundling.

For the relationship with snapshotting, I do want to make sure the ordering is clear: The
runner is required to call {{FinishBundle}} before durably committing otherwise it might not
be actually committed, but the commit can still fail so the runner is not required to commit
right away. So it would be fine to just call it every once in a while, and even less often
take a snapshot, but the important thing is that you can't have other method calls in between
the {{FinishBundle}} and the commit because they might set up new transient state that needs
to be flushed.

> Support real Bundle in Flink runner
> -----------------------------------
>                 Key: BEAM-1612
>                 URL:
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
> The Bundle is very important in the beam model. Users can use the bundle to flush buffer,
can reuse many heavyweight resources in a bundle. Most IO plugins use the bundle to flush.

> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, such as
first placed in JavaHeap, flush into RocksDbState when invoke finishBundle , this can reduce
the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need support real
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But sometimes this
"Bundle" maybe too big. This depends on the user's checkpoint configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to a full-bundle
by count or eventTime or processTime or {{snapshot}}. We do not need to wait, just call the
startBundle and finishBundle at the right time.

This message was sent by Atlassian JIRA

View raw message