flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3761) Introduce key group state backend
Date Tue, 24 May 2016 15:32:12 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15298355#comment-15298355

ASF GitHub Bot commented on FLINK-3761:

Github user tillrohrmann commented on the pull request:

    Thanks for the initial feedback @aljoscha :-)
    The introduction of `PartitionedState` is indeed not strictly necessary for this PR. The
idea was that we will have partitioned and non-partitioned state in the future. `PartitionedState`
is the key-value state backed by the `PartitionedStateBackend` whereas non-partitioned state
is backed by the `AbstractStateBackend`. The first non-partitioned state (apart from the state
serialized via `CheckpointStateOutputStream`) could be the redistributable non-partitioned
state necessary for the `KafkaSources`, for example. Thus, the `PartitionedState` is more
of a logical separation and it lays the foundation so that also non-keyed stream operators
can use a proper state abstraction. But I can revert it, if you deem it redundant or pre-mature.
    It is true that the `PartitionedStateBackend` and the `KeyGroupStateBackend` have **almost**
the same signature. However, the changes you've mentioned are imho crucial and made the whole
refactoring of the state backends necessary in the first place. The difference is that the
`KeyGroupStateBackend` is aware of the key groups and, consequently, is able to snapshot and
restore each key group individually. Trying to work around this would mean that the `PartitionedStateBackend`
always has a single key group associated. But for that, it would have to know the sub task
index of the enclosing `StreamOperator` to assign a sensible key group index. Furthermore,
it wouldn't make sense to use any other `PartitionedStateBackend` than the `KeyGroupStateBackend`
(given that it respects the `KeyGroupAssigner`) for the `AbstractStreamOperator`, because
the data is shuffled according to the key group assignments. In general, I think the notion
of key groups are touching too many parts of the Flink runtime so that it makes no longer
sense to try to unify the `KeyGroupStateBackends` and `PartitionedStateBackends`. The state
backends used by the `AbstractStreamOperator` have to be aware of that notion.
    You can regard the `PartitionedStateBackend` as an internal class which was introduced
to reuse the existing state backend implementations via the `GenericKeyGroupStateBackend`.
In the future it might make sense to directly implement the `KeyGroupStateBackend` interface
to decrease the key group overhead. It's just unfortunate that Java does not allow to specify
package private methods. Otherwise, I would have declared the `createPartitionedStateBackend`
as package private. But since the `GenericKeyGroupStateBackend` resides in a sub-package of
`o.a.f.runtime.state`, it cannot access this method. But I think we could refactor it the
following way: Remove `createPartitionedStateBackend`, make `createKeyGroupStateBackend` abstract,
let the implementations of `AbstractStateBackend` implement a `PartitionedStateBackendFactory`
interface and define the `createKeyGroupStateBackend` method for all `AbstractStateBackend`
implementations with creating a `GenericKeyGroupStateBackend` which requires a `PartitionedStateBackendFactory`.
That would be probably a better design.

> Introduce key group state backend
> ---------------------------------
>                 Key: FLINK-3761
>                 URL: https://issues.apache.org/jira/browse/FLINK-3761
>             Project: Flink
>          Issue Type: Sub-task
>          Components: state backends
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
> After an off-line discussion with [~aljoscha], we came to the conclusion that it would
be beneficial to reflect the differences between a keyed and a non-keyed stream also in the
state backends. A state backend which is used for a keyed stream offers a value, list, folding
and value state and has to group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it work with
dynamic scaling. A union state is a state which is broadcasted to all tasks in case of a recovery.
The state backends can then select what information they need to recover from the whole state
(formerly distributed).

This message was sent by Atlassian JIRA

View raw message