flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zentol <...@git.apache.org>
Subject [GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level
Date Tue, 25 Apr 2017 16:33:49 GMT
GitHub user zentol opened a pull request:


    [FLINK-5892] Restore state on the operator level

    ## General
    This PR is a collaboration between @guoweiM and myself, enabling Flink to restore state
on the operator level. This means that the topology of a job may change in regards to chains
when restoring from a 1.3 savepoint, allowing the arbitrary addition, removal or modification
of chains.
    The cornerstone for this is a semantic change for savepoints, no structural changes have
been made to the `SavepointV0/1/2` classes or their serialized format:
    In 1.2 a savepoint contains the states of tasks. If a task consists of multiple operators
then the stored TaskState internally contains a list of states, one entry for each operator.
    In 1.3 a savepoint contains the states of operators only; the notion of tasks is eliminated.
If a task consists of multiple operators we store one TaskState for each operator instead.
Internally they each contain a list of states with a length of 1.
    ## Implementation
    In order for this to work a number of changes had to be made.
    First and foremost we required a new `StateAssignmentOperation` that was aware of operators.
    (74881a2, 8be9c58, 4fa8bbd)
    Since the SAO uses the `ExecutionGraph` classes to map the restored state it was necessary
to forward the IDs of all contained operators from the `StreamingJobGraphGenerator` to the
    The `PendingCheckpoint` class had to be adjusted to conform to the new semantics; received
`SubtaskStates`, containing the state of a task, are broken down into SubtaskStates for the
individual operators.
    ## Tests
    The majority of this PR are new tests (60% or so).
    A number of tests were added under flink-tests that test the migration path from 1.2 to
    These tests first restore a job from a 1.2 savepoint, without changes to the topology,
verify that the state was restored correctly and finally create a new savepoint. They then
restore from this migrated 1.3 savepoint, with changes to the topology for varying scenarios,
and verify the correct restoration of state again.
    A new test was also added to the `CheckpointCoordinatorTest` which tests the support for
topology changes without executing a job.
    A number of existing tests had to be tweaked to run with the new changes, but these changes
all boil down to extending existing mocks by a method or two.
    ## Other changes
    To make it more obvious that we deal with operators and not tasks a new `OperatorID` class
was introduced, and usages of `JobVertexID` in savepoint-related parts were replaced when

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 5982_operator_state

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3770
commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f
Author: zentol <chesnay@apache.org>
Date:   2017-04-03T15:39:50Z

    [prerequisite] Disable exception when assigning uid on chained operator

commit 74881a2788d034db67d99d6d32dbb2cf923aed53
Author: zentol <chesnay@apache.org>
Date:   2017-04-04T10:53:56Z

    [internal] Adjust SavepointLoader to new Savepoint semantics

commit f7b8ef943097cd994a4ef3d5594fea4027720f5a
Author: zentol <chesnay@apache.org>
Date:   2017-04-04T13:02:55Z

    [internal] adjust PendingCheckpoint to be in line with new semantics

commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac
Author: zentol <chesnay@apache.org>
Date:   2017-04-04T11:33:54Z

    [internal] Get operator ID's into ExecutionGraph

commit 465805792932cb888393d9257fdefd828fa59343
Author: zentol <chesnay@apache.org>
Date:   2017-04-25T16:07:16Z

    [internals] Extract several utility methods from StateAssignmentOperation

commit 008e848715b7091c3deabc9251d9d673f5506e64
Author: guowei.mgw <guowei.mgw@gmail.com>
Date:   2017-04-24T09:47:47Z

    [internal] Add new StateAssignmentOperation

commit ffb93298ce90956b9886b3526258f6a814b7e0af
Author: zentol <chesnay@apache.org>
Date:   2017-04-04T13:01:07Z

    [internal] Integrate new StateAssignmentOperation version

commit d1efdb1c34d59f04147292b320528cd2bc838244
Author: zentol <chesnay@apache.org>
Date:   2017-04-03T15:40:21Z

    [tests] Add tests for chain modifications

commit 8b45b5a77f2cc499fdbb41d8198ac0a2e25bb1d7
Author: zentol <chesnay@apache.org>
Date:   2017-04-24T11:58:07Z

    [tests] Adjust existing tests

commit b5430f98bfbb56e49f9a8b21fe5b1e5dd7358714
Author: guowei.mgw <guowei.mgw@gmail.com>
Date:   2017-04-24T10:13:44Z

    [tests] Add tests for topology modifications

commit fe7402358a89c37bd470437f9c3f05d7ff3d3ca1
Author: zentol <chesnay@apache.org>
Date:   2017-04-25T14:08:07Z

    [internal] Introduce OperatorID for state business


If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.

View raw message