flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kostas Kloudas (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-4940) Add support for broadcast state
Date Tue, 16 Jan 2018 15:02:02 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327201#comment-16327201
] 

Kostas Kloudas edited comment on FLINK-4940 at 1/16/18 3:01 PM:
----------------------------------------------------------------

"We therefore only have to checkpoint the state of one arbitrary instance, for example instance
0." 

I tend to disagree with this. The reason being that this way we can create a hotspot upon
restoring. Imagine we have parallelism of 1000 and FS as out backend and upon restoring, every
task has to read from the same disk(s). 

In addition, I think that the broadcast state should be in the OperatorStateBackend. Broadcast
state is by definition not partitionable and therefore not a good fit for the KeyedStateBackend


was (Author: kkl0u):
"We therefore only have to checkpoint the state of one arbitrary instance, for example instance
0." 

I tend to disagree with this. The reason being that this way we can create a hotspot upon
restoring. Imagine we have parallelism of 1000 and FS as out backend and upon restoring, every
task has to read from the same disk(s). 

In addition, I think that the broadcast state should be in the OperatorStateBackend. Broadcast
state is by definition not partitionable and therefore not a good fit for the KeyedStateBackend.

> Add support for broadcast state
> -------------------------------
>
>                 Key: FLINK-4940
>                 URL: https://issues.apache.org/jira/browse/FLINK-4940
>             Project: Flink
>          Issue Type: Sub-task
>          Components: DataStream API
>            Reporter: Aljoscha Krettek
>            Assignee: Kostas Kloudas
>            Priority: Major
>
> As mentioned in https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
we need broadcast state to support job patterns where one (or several) inputs are broadcast
to all operator instances and where we keep state that that is mutated only based on input
from broadcast inputs. This special restriction ensures that the broadcast state is the same
on all parallel operator instances when checkpointing (except when using at-least-once mode).
We therefore only have to checkpoint the state of one arbitrary instance, for example instance
0.
> For the different types of side inputs we need different types of state, luckily, the
side input types align with these state types we currently have for keyed state:
>  - {{ValueState}}
>  - {{ListState}}
>  - {{MapState}}
> We can therefore reuse keyed state backends for our purposes but need to put a more restricting
API in front of it: mutation of broadcast state must only be allowed when actually processing
broadcast input. If we don't have this check users can (by mistake) modify broadcast state.
This would lead to incorrect results which are very hard to notice, much less debug.
> With the way the Flink state API works (users can get a {{State}} in {{open()}} and work
with state by calling methods on that) we have to add special wrapping state classes that
only allow modification of state when processing a broadcast element.
> For the API, I propose to add a new interface `InternalStateAccessor`:
> {code}
> /**
>  * Interface for accessing persistent state.
>  */
> @PublicEvolving
> public interface InternalStateAccessor {
>   <N, S extends State> S state(
> 			N namespace,
> 			TypeSerializer<N> namespaceSerializer,
> 			StateDescriptor<S, ?> stateDescriptor)}
> {code}
> this is the same as `KeyedStateBackend.getPartitionedState()` but allows us to abstract
away the special nature of broadcast state. This is also meant as an external interface and
is not to be exposed to user functions. Only operators should deal with this.
> {{AbstractStreamOperator}} would get a new method `getBroadcastStateAccessor()` that
returns an implementation of this interface. The implementation would have a {{KeyedStateBackend}}
but wrap the state in special wrappers that only allow modification when processing broadcast
elements (as mentioned above). 
> On the lower implementation levels, we have to add a new entry for our state to `OperatorSnapshotResult`.
For example:
> {code}
> private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;
> {code}
> Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation logic will
have to be adapted to support this new kind of state. With the ongoing changes in supporting
incremental snapshotting and other new features for `KeyedStateBackend` this should be coordinated
with [~StephanEwen] and/or [~stefanrichter83@gmail.com] and/or [~xiaogang.shi]. We also have
to be very careful about maintaining compatibility with savepoints from older versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message