flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5937) Add documentation about the task lifecycle.
Date Tue, 28 Feb 2017 16:01:46 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888313#comment-15888313
] 

ASF GitHub Bot commented on FLINK-5937:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3429#discussion_r103484503
  
    --- Diff: docs/internals/task_lifecycle.md ---
    @@ -0,0 +1,149 @@
    +---
    +title:  "Task Lifecycle"
    +nav-title: Task Lifecycle
    +nav-parent_id: internals
    +nav-pos: 5
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +A task in Flink is the basic unit of execution. This is the place where each parallel
instance of your operators gets executed. 
    +
    +This document goes through the different phases in the lifecycle of the `StreamTask`,
which is the base for all different task sub-types in Flink's streaming engine. 
    +
    +* This will be replaced by the TOC
    +{:toc}
    +
    +## Operator Lifecycle in a nutshell
    +
    +As the task is the entity that executes your operators, its lifecycle is tightly integrated
with that of an operator. Given this, it is worth spending a few lines 
    +simply mentioning the basic methods representing the lifecycle of an operator before
diving into those of the `StreamTask` itself. The list is presented below in 
    +the order that they are called. These are:
    +
    +        // initialization
    +	    initializeState()
    +	    setup()
    +	    open()
    +	
    +	    // processing
    +	    processElement()
    +	    processWatermark()
    +	
    +	    // termination
    +	    close()
    +	    dispose()
    +
    +        // checkpointing
    +        snapshotState()
    +    
    +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining
it, the `setup()` is called to initialize some operator specific 
    +machinery, such as its `RuntimContext` and its metric collection data-structures, and
the `open()` executes any operator-specific initialization, such as
    +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. 
    +
    +Now that everything is set, the operator is ready to process fresh incoming data. This
is done by invoking the `processElement()` and `processWatermark()` methods which
    +contain the logic for processing elements and watermark respectively.
    +
    +Finally, in the case of normal, fault-free termination of the operator (*e.g.* if the
stream is finite and its end is reached), the `close()` method is called to 
    +perform any final bookkeeping action required by the operator's logic, and the `dispose()`
is called after that to free any resources held by the operator. 
    +
    +In the case of termination due to a failure or due to manual cancellation, the execution
jumps directly to the `dispose()`, and skips any intermediate phases between
    +the phase the operator was in when the failure happened and the `dispose` one.
    +
    +**Checkpoints:** The `snapshotState()` method of the operator is called asynchronously
to the rest of the methods described above, whenever a checkpoint barrier is received. 
    +Its responsibility is to store the current state of the operator to the specified [state
backend]({{ site.baseurl }}/ops/state_backends.html) from where it is going to be 
    +retrieved when the job resumes execution after a failure. For a bried description of
Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion

    +on the principles around checkpointing in Flink please read the corresponding documentation:
[Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html).
    +
    +## Task Lifecycle
    +
    +Given the above brief introduction on the operator's main phases, this section describes
in more detail how a task calls them during its execution on a cluster. The sequence 
    +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`.
The remainder of this document is split into two subsections, one describing the 
    +phases during a regular, fault-free execution of a task (see [Normal Execution](#normal-execution)),
and (a shorter) one describing the different sequence followed in case 
    +the task is cancelled (see [Interrupted Execution](#interrupted-execution)), 
    +either manually, or due some other reason, *e.g.* an exception thrown during execution.
    +
    +### Normal Execution
    +
    +The steps a task goes through when executed until completion without being interrupted
are illustrated below:
    +
    +	setInitialState()
    +	invoke()
    +		Create basic utils (config, etc) and load the chain of operators
    +		setup-operators()
    + 		task specific init()
    + 		initialize-operator-states()
    + 		open-operators()
    + 		run()
    + 		close-operators()
    + 		dispose-operators()
    + 		task specific cleanup()
    + 		common cleanup
    +
    +As shown above, after recovering the task configuration and initializing some important
runtime parameters, the very first step for the task is to retrieve its initial, 
    +task-wide state. This is done in the `setInitialState()`, and it is particularly important
in two cases:
    +
    +1. when the task is recovering from a failure and restarts from the last successful checkpoint,
and
    +2. when resuming from a [savepoint]({{ site.baseurl }}/setup/savepoints.html). 
    +
    +If it is the first time the task is executed, the initial task state is empty. 
    +
    +After recovering any initial state, the task goes into its `invoke()` method. There,
it first initializes the operators involved in the local computation by calling 
    +the `setup()` method of each one of them and then performs its task-specific initialization
by calling the local `init()` method. By task-specific, we mean that 
    +depending on the type of the task (`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`,
etc), this step may differ, but in any case, here is where the necessary 
    +task-wide resources are acquired. As an example, the `OneInputStreamTask` which represents
a task that expects to have a single input stream, initializes the connection(s) 
    +to the location(s) of the different partitions of the input stream that are relevant
to the local task.
    +
    +Having acquired the necessary resources, it is time for the different operators and user-defined
functions to acquire their individual state from the task-wide state 
    +retrieved above. This is done in the `initializeState()` method, which calls the `initializeState()`
of each individual operator. This method should be overriden by 
    --- End diff --
    
    "overriden" -> "overridden"


> Add documentation about the task lifecycle.
> -------------------------------------------
>
>                 Key: FLINK-5937
>                 URL: https://issues.apache.org/jira/browse/FLINK-5937
>             Project: Flink
>          Issue Type: Bug
>          Components: Documentation
>    Affects Versions: 1.3.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message