flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
Date Wed, 22 Mar 2017 13:30:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15936305#comment-15936305

ASF GitHub Bot commented on FLINK-6034:

Github user StefanRRichter commented on a diff in the pull request:

    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
    @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances(
    +	 * Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct
    +	 * key group index for the given subtask {@link KeyGroupRange}.
    +	 * <p>
    +	 * <p>This is publicly visible to be used in tests.
    +	 */
    +	public static List<KeyedStateHandle> getKeyedStateHandles(
    +			Collection<? extends KeyedStateHandle> keyedStateHandles,
    +			KeyGroupRange subtaskKeyGroupRange) {
    +		List<KeyedStateHandle> subtaskKeyedStateHandles = new ArrayList<>();
    +		for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
    +			KeyGroupRange intersection = keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange);
    --- End diff --
    I wonder if we could somehow introduce a `KeyedStateHandle::intersect(KeyGroupRange)`
that again returns a `KeyedStateHandle` with a `KeyGroupRage` that is the intersection of
the original range and the argument. Basically a higher level version of what the KeyGroupsStateHandle
can do, and the concrete implementations (like `KeyGroupsStateHandle`) know how the virtually
split themselves up into a sub-range. This also would transfer less data in the RPC (less
offsets) and saves the post-filtering in the backend. 
    Otherwise, we could have a boolean method for just checking intersection, because there
is no need to create `KeyGroupRange` objects anymore, because we do not actually use them.

> Add KeyedStateHandle for the snapshots in keyed streams
> -------------------------------------------------------
>                 Key: FLINK-6034
>                 URL: https://issues.apache.org/jira/browse/FLINK-6034
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Xiaogang Shi
>            Assignee: Xiaogang Shi
> Currently, the only type of the snapshots in keyed streams is {{KeyGroupsStateHandle}}
which is full and store the states one group after another. With the introduction of incremental
checkpointing, we need a higher level abstraction of keyed snapshots to allow flexible snapshot
> The implementation of {{KeyedStateHandle}} s may vary a lot in different backends. The
only information needed in {{KeyedStateHandle}} s is their key group range. When recovering
the job with a different degree of parallelism, {{KeyedStateHandle}} s will be assigned to
those subtasks whose key group ranges overlap with their ranges.

This message was sent by Atlassian JIRA

View raw message