beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner
Date Mon, 13 Mar 2017 09:30:04 GMT

    [ https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15907069#comment-15907069
] 

Aljoscha Krettek commented on BEAM-1612:
----------------------------------------

I think what we could do is keep state/state updates in in-memory data structures and only
on commit (which is either by time or volume, i.e. how many elements) we manifest these changes,
which are a diff to the changes we keep in Flink state, to Flink state. This would not make
the changes durable, that only happens when the next Flink checkpoint succeeds.

What do you think?

By the way, a Flink checkpoint can also fail, so if we committed right before checkpointing
we would still not have any guarantees of this actually succeeding.

> Support real Bundle in Flink runner
> -----------------------------------
>
>                 Key: BEAM-1612
>                 URL: https://issues.apache.org/jira/browse/BEAM-1612
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to flush buffer,
can reuse many heavyweight resources in a bundle. Most IO plugins use the bundle to flush.

> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, such as
first placed in JavaHeap, flush into RocksDbState when invoke finishBundle , this can reduce
the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need support real
Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But sometimes this
"Bundle" maybe too big. This depends on the user's checkpoint configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to a full-bundle
by count or eventTime or processTime or {{snapshot}}. We do not need to wait, just call the
startBundle and finishBundle at the right time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message