flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3755) Introduce key groups for key-value state to support dynamic scaling
Date Tue, 16 Aug 2016 14:59:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422848#comment-15422848

ASF GitHub Bot commented on FLINK-3755:

GitHub user StefanRRichter opened a pull request:


    [FLINK-3755] Introduce key groups for key-value state to support dynamic scaling

    This pull request introduces the concept of key groups to Flink. A key group is the smallest
assignable unit of key-value state to a stream operator. Differently said, it is a sub set
of the key space which can be assigned to a stream operator. With the introduction of key
groups, it will be possible to dynamically scale Flink operators that use partitioned (=key-value)
    In particular, this pull request addresses the following sub-issues:
    - fully: [FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter
    - partially: [FLINK-4381] Refactor State to Prepare For Key-Group State Backends
    Furthermore, this pull request is partially based on pull request: #1988
    Overall, this pull request introduces the following changes:
    # 1) Adopted from #1988 (plus introduction of distributing keys as ranges (`KeyGroupRange`)

    ## a) Introduction of KeyGroupAssigner
    In order to partition keys into key groups, the`KeyGroupAssigner` interface is introduced.
This allows for partitioning the key space into smaller units which can be assigned to operators.
A scale up/down of parallelism is then performed by simply reassigning the key groups to more/less
    For this pull request, the former `HashPartitioner` is now renamed to `KeyGroupStreamPartitioner`
and uses the `KeyGroupAssigner` to distribute the streaming records in a consistent way w.r.t.
the key group mappings. The key groups, in turn, are mapped as ranges of key groups (`key
group index * parallelism / number of key groups` = out-going channel) to the downstream tasks.

    When restoring from a checkpoint or savepoint, scale up/down of parallelism basically
boils down to splitting/merging the key group ranges in alignment with the adjusting assignment
to operators that happens automatically through the `KeyGroupStreamPartitioner`.
    ## b) Introduction of MaxParallelism to user API
    In order to scale programs up or down, it is necessary to define the maximum number of
key groups. The maximum number of key groups denotes the maximum parallelism of an operator,
because every operator needs at least one key group to get elements assigned. Thus, in order
to specify this upper limit, the ExecutionConfig allows to set a job-wide max parallelism
value via ExecutionConfig.setMaxParallelism. In addition to that the SingleOutputStreamOperator
allows to set similarly to the parallelism a max parallelism value on an operator basis. If
the max parallelism has not been set and there is no job-wide max parallelism set, the parallelism
of the operator will be taken as the max parallelism. Thus, every operator would then receive
a single key group. Currently, we restrict the maximum number of key groups to 2^15 (Short.MAX_VALUE).
    # 2)  State and StateHandle refactoring
    ## a) StateHandle refactoring
    We have simplified and cleaned up the hierarchy and use cases of state handles. `StreamStateHandle`
and `RetrievableStateHandle` are now at the heart of the state handles system.
    Their conceptual main difference is that `StreamStateHandle` provides a seekable input
stream to the actual state data and leaves state reconstruction to client code, whereas `RetrievableStateHandle`
represents a simple way for client code to retrieve state as readily constructed object and
the state handle implementation taking care of state reconstruction.
    ## b) Operator serialization
    The unified abstraction for operators to persist their state is `CheckpointStateOutputStream`.
All operators can simply directly write their serialized state into this stream, which returns
a `StreamStateHandle` on close. `StreamTaskState` and `StreamTaskStateList` become obsolete.
This change makes versioning of operator state serialization formats easier and we should
ensure and test that our operators are aware of serialization versions.
    This change leaves the following methods for snapshot/restore in `StreamOperator`:
    void snapshotState(
        FSDataOutputStream out, 
        long checkpointId, 
        long timestamp) throws Exception;
    void restoreState(FSDataInputStream in) throws Exception;
    ## c) Split task state into operator state (= non-partitioned state) and keyed-state (=
partitioned state)
    We have split the operator state into operator state and keyed state as follows. 
    Operator state is organized as a `ChainedStateHandle<StreamStateHandle>`. The chained
state handle encapsulates the individual `StreamStateHandle` for all operators in an operator
    Keyed state is organized as a `List<KeyGroupsStateHandle>`. Each `KeyGroupsStateHandle`
consists of one `StreamStateHandle` and one `KeyGroupRangeOffsets` object. `KeyGroupRangeOffsets`
denotes the range of all key groups that can are referenced by the handle, together with their
respective stream offsets. The `StreamStateHandle` gives access to a seekable stream that
contains the actual state data for all key groups from the key group range; individual key
group states are located in the stream at their previously mentioned stream offsets.
    Notice that we have to provide a list of `KeyGroupsStateHandle` to support the case of
scaling down parallelism. In this case, it can happen that key group states from several `KeyGroupsStateHandle`
(each representing the state of one previously existing operator) have to be combined to form
the keyed state of reduced amount of current operators. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink keyed-state

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2376
commit 8a57da24b499e059fb73bd7050a96d32b57fcec4
Author: Till Rohrmann <trohrmann@apache.org>
Date:   2016-07-28T13:08:24Z

    [FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter
    This introduces a new KeySelector that assigns keys to key groups and
    also adds the max parallelism parameter throughout all API levels.
    This also adds tests for the newly introduced features.

commit 62fb798b762d8a69d30479561ed43b580facc600
Author: Stefan Richter <s.richter@data-artisans.com>
Date:   2016-08-11T09:59:07Z

    [FLINK-4381] Refactor State to Prepare For Key-Group State Backends

commit c038d6d9435c329ab4ca06c119cff5456924b5ab
Author: Till Rohrmann <trohrmann@apache.org>
Date:   2016-08-11T10:14:18Z

    [FLINK-4380] Add tests for new Key-Group/Max-Parallelism
    This tests the rescaling features in CheckpointCoordinator and

commit 751effb855a81e6322a7e897c98dc59ea065d072
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Date:   2016-08-12T09:07:09Z

    Ignore RescalingITCase


> Introduce key groups for key-value state to support dynamic scaling
> -------------------------------------------------------------------
>                 Key: FLINK-3755
>                 URL: https://issues.apache.org/jira/browse/FLINK-3755
>             Project: Flink
>          Issue Type: New Feature
>    Affects Versions: 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
> In order to support dynamic scaling, it is necessary to sub-partition the key-value states
of each operator. This sub-partitioning, which produces a set of key groups, allows to easily
scale in and out Flink jobs by simply reassigning the different key groups to the new set
of sub tasks. The idea of key groups is described in this design document [1]. 
> [1] https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing

This message was sent by Atlassian JIRA

View raw message