flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stephan Ewen (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator
Date Wed, 26 Apr 2017 21:45:04 GMT

     [ https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Stephan Ewen resolved FLINK-6390.
    Resolution: Fixed

Implemented in 90ca438106e63c5032ee2ad27e54e9f573eac386

> Add Trigger Hooks to the Checkpoint Coordinator
> -----------------------------------------------
>                 Key: FLINK-6390
>                 URL: https://issues.apache.org/jira/browse/FLINK-6390
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.3.0
> Some source systems require to be notified prior to starting a checkpoint, in order to
do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are called by the
checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
>  * The interface for hooks that can be called by the checkpoint coordinator when triggering
>  * restoring a checkpoint. Such a hook is useful for example when preparing external
systems for
>  * taking or restoring checkpoints.
>  * 
>  * <p>The {@link #triggerCheckpoint(long, long, Executor)} method (called when
triggering a checkpoint)
>  * can return a result (via a future) that will be stored as part of the checkpoint metadata.
>  * When restoring a checkpoint, that stored result will be given to the {@link #restoreCheckpoint(long,
>  * method. The hook's {@link #getIdentifier() identifier} is used to map data to hook
in the presence
>  * of multiple hooks, and when resuming a savepoint that was potentially created by a
different job.
>  * The identifier has a similar role as for example the operator UID in the streaming
>  * 
>  * <p>The MasterTriggerRestoreHook is defined when creating the streaming dataflow
graph. It is attached
>  * to the job graph, which gets sent to the cluster for execution. To avoid having to
make the hook
>  * itself serializable, these hooks are attached to the job graph via a {@link MasterTriggerRestoreHook.Factory}.
>  * 
>  * @param <T> The type of the data produced by the hook and stored as part of the
checkpoint metadata.
>  *            If the hook never stores any data, this can be typed to {@code Void}.
>  */
> public interface MasterTriggerRestoreHook<T> {
> 	/**
> 	 * Gets the identifier of this hook. The identifier is used to identify a specific hook
in the
> 	 * presence of multiple hooks and to give it the correct checkpointed data upon checkpoint
> 	 * 
> 	 * <p>The identifier should be unique between different hooks of a job, but deterministic/constant
> 	 * so that upon resuming a savepoint, the hook will get the correct data.
> 	 * For example, if the hook calls into another storage system and persists namespace/schema
> 	 * information, then the name of the storage system, together with the namespace/schema
name could
> 	 * be an appropriate identifier.
> 	 * 
> 	 * <p>When multiple hooks of the same name are created and attached to a job graph,
only the first
> 	 * one is actually used. This can be exploited to deduplicate hooks that would do the
same thing.
> 	 * 
> 	 * @return The identifier of the hook. 
> 	 */
> 	String getIdentifier();
> 	/**
> 	 * This method is called by the checkpoint coordinator prior when triggering a checkpoint,
> 	 * to sending the "trigger checkpoint" messages to the source tasks.
> 	 * 
> 	 * <p>If the hook implementation wants to store data as part of the checkpoint,
it may return
> 	 * that data via a future, otherwise it should return null. The data is stored as part
> 	 * the checkpoint metadata under the hooks identifier (see {@link #getIdentifier()}).
> 	 * 
> 	 * <p>If the action by this hook needs to be executed synchronously, then this
method should
> 	 * directly execute the action synchronously and block until it is complete. The returned
> 	 * (if any) would typically be a completed future.
> 	 * 
> 	 * <p>If the action should be executed asynchronously and only needs to complete
before the
> 	 * checkpoint is considered completed, then the method may use the given executor to
execute the
> 	 * actual action and would signal its completion by completing the future. For hooks
that do not
> 	 * need to store data, the future would be completed with null.
> 	 * 
> 	 * @param checkpointId The ID (logical timestamp, monotonously increasing) of the checkpoint
> 	 * @param timestamp The wall clock timestamp when the checkpoint was triggered, for
> 	 *                  info/logging purposes. 
> 	 * @param executor The executor for asynchronous actions
> 	 * 
> 	 * @return Optionally, a future that signals when the hook has completed and that contains
> 	 *         data to be stored with the checkpoint.
> 	 * 
> 	 * @throws Exception Exceptions encountered when calling the hook will cause the checkpoint
to abort.
> 	 */
> 	@Nullable
> 	Future<T> triggerCheckpoint(long checkpointId, long timestamp, Executor executor)
throws Exception;
> 	/**
> 	 * This method is called by the checkpoint coordinator prior to restoring the state
of a checkpoint.
> 	 * If the checkpoint did store data from this hook, that data will be passed to this
> 	 * 
> 	 * @param checkpointId The The ID (logical timestamp) of the restored checkpoint
> 	 * @param checkpointData The data originally stored in the checkpoint by this hook,
possibly null. 
> 	 * 
> 	 * @throws Exception Exceptions thrown while restoring the checkpoint will cause the
> 	 *                   operation to fail and to possibly fall back to another checkpoint.

> 	 */
> 	void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception;
> 	/**
> 	 * Creates a the serializer to (de)serializes the data stored by this hook. The serializer
> 	 * serializes the result of the Future returned by the {@link #triggerCheckpoint(long,
long, Executor)}
> 	 * method, and deserializes the data stored in the checkpoint into the object passed
to the
> 	 * {@link #restoreCheckpoint(long, Object)} method. 
> 	 * 
> 	 * <p>If the hook never returns any data to be stored, then this method may return
null as the
> 	 * serializer.
> 	 * 
> 	 * @return The serializer to (de)serializes the data stored by this hook
> 	 */
> 	@Nullable
> 	SimpleVersionedSerializer<T> createCheckpointDataSerializer();
> {code}

This message was sent by Atlassian JIRA

View raw message