flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6534) SharedStateRegistry is disposing state handles from main thread
Date Fri, 12 May 2017 08:42:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16007798#comment-16007798

ASF GitHub Bot commented on FLINK-6534:

Github user StefanRRichter commented on a diff in the pull request:

    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
    @@ -18,91 +18,137 @@
     package org.apache.flink.runtime.state;
    +import org.apache.flink.runtime.concurrent.Executors;
     import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     import java.util.HashMap;
     import java.util.Map;
    +import java.util.Objects;
    +import java.util.concurrent.Executor;
      * A {@code SharedStateRegistry} will be deployed in the 
    - * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
    + * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
      * maintain the reference count of {@link SharedStateHandle}s which are shared
    - * among different checkpoints.
    - *
    + * among different incremental checkpoints.
     public class SharedStateRegistry {
     	private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class);
     	/** All registered state objects by an artificial key */
    -	private final Map<String, SharedStateRegistry.SharedStateEntry> registeredStates;
    +	private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry>
    +	/** Executor for async state deletion */
    +	private final Executor asyncDisposalExecutor;
     	public SharedStateRegistry() {
     		this.registeredStates = new HashMap<>();
    +		this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534
    --- End diff --
    I totally agree that there should not be a new executor, that is why I marked it with
the TODO. This is just a preparation for the full fix of FLINK-6534. My plan for the full
fix is to pass the IO executor from the `CompletedCheckpointStore` and use it inside the registry.
This will happen outside of any synchronization. For now, this code is a working placeholder
for the full fix that I will do as a followup.

> SharedStateRegistry is disposing state handles from main thread
> ---------------------------------------------------------------
>                 Key: FLINK-6534
>                 URL: https://issues.apache.org/jira/browse/FLINK-6534
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Blocker
>             Fix For: 1.3.0
> Currently, the {{ShareStateRegistry}} is deleting state handles that are no longer referenced
under the registry's lock and from the main thread. We should use the {{CheckpointCoordinator}}'s
 async IO executor to make this non-blocking.

This message was sent by Atlassian JIRA

View raw message