flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8360) Implement task-local state recovery
Date Thu, 01 Feb 2018 07:24:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348132#comment-16348132

ASF GitHub Bot commented on FLINK-8360:

Github user tillrohrmann commented on a diff in the pull request:

    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
    @@ -109,14 +109,20 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput()
throws Ex
     		return operatorStateCheckpointOutputStream;
    -	public RunnableFuture<KeyedStateHandle> getKeyedStateStreamFuture() throws IOException
    -		KeyGroupsStateHandle keyGroupsStateHandle = closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream);
    -		return new DoneFuture<KeyedStateHandle>(keyGroupsStateHandle);
    +	public RunnableFuture<SnapshotResult<KeyedStateHandle>> getKeyedStateStreamFuture()
throws IOException {
    +		return getGenericStateStreamFuture(keyedStateCheckpointOutputStream);
    -	public RunnableFuture<OperatorStateHandle> getOperatorStateStreamFuture() throws
IOException {
    -		OperatorStateHandle operatorStateHandle = closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream);
    -		return new DoneFuture<>(operatorStateHandle);
    +	public RunnableFuture<SnapshotResult<OperatorStateHandle>> getOperatorStateStreamFuture()
throws IOException {
    +		return getGenericStateStreamFuture(operatorStateCheckpointOutputStream);
    +	}
    +	private <T extends StateObject> RunnableFuture<SnapshotResult<T>>
    +		NonClosingCheckpointOutputStream<? extends T> stream) throws IOException {
    +		T operatorStateHandle = (T) closeAndUnregisterStreamToObtainStateHandle(stream);
    --- End diff --
    This cast seems a bit fishy to me. I think it should not be necessary if the generics
are applied correctly. A way to solve it would be `T extends StreamStateHandle` and `RunnableFuture<?
extends SnapshotResult<? extends KeyedStateHandle>> getKeyedStateStreamFuture()`

> Implement task-local state recovery
> -----------------------------------
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
> This issue tracks the development of recovery from task-local state. The main idea is
to have a secondary, local copy of the checkpointed state, while there is still a primary
copy in DFS that we report to the checkpoint coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, to save
network bandwidth. This requires that the assignment from tasks to slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and can easily
enhance it to all other state types (e.g. operator state) later.

This message was sent by Atlassian JIRA

View raw message