Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CEAD418544 for ; Mon, 4 Apr 2016 08:42:51 +0000 (UTC) Received: (qmail 95589 invoked by uid 500); 4 Apr 2016 08:42:51 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 95496 invoked by uid 500); 4 Apr 2016 08:42:51 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 95186 invoked by uid 99); 4 Apr 2016 08:42:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Apr 2016 08:42:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 08343E0A33; Mon, 4 Apr 2016 08:42:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uce@apache.org To: commits@flink.apache.org Date: Mon, 04 Apr 2016 08:42:58 -0000 Message-Id: <20c36ed8d93647babeda3fd45ff2f4ac@git.apache.org> In-Reply-To: <345cff998af34598a155b6abad9cd1c9@git.apache.org> References: <345cff998af34598a155b6abad9cd1c9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/11] flink git commit: [docs] Add "concepts" documentation page [docs] Add "concepts" documentation page Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f41dd6a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f41dd6a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f41dd6a Branch: refs/heads/release-1.0 Commit: 7f41dd6a7a1b62c20093f7c6ee39930b794cf543 Parents: 974c66e Author: Stephan Ewen Authored: Thu Mar 31 19:25:35 2016 +0200 Committer: Ufuk Celebi Committed: Mon Apr 4 10:42:03 2016 +0200 ---------------------------------------------------------------------- docs/concepts/concepts.md | 246 ++++++ docs/concepts/fig/checkpoints.svg | 249 ++++++ .../fig/event_ingestion_processing_time.svg | 375 ++++++++++ docs/concepts/fig/parallel_dataflow.svg | 487 ++++++++++++ docs/concepts/fig/processes.svg | 749 +++++++++++++++++++ docs/concepts/fig/program_dataflow.svg | 546 ++++++++++++++ docs/concepts/fig/slot_sharing.svg | 721 ++++++++++++++++++ docs/concepts/fig/state_partitioning.svg | 291 +++++++ docs/concepts/fig/tasks_chains.svg | 463 ++++++++++++ docs/concepts/fig/tasks_slots.svg | 395 ++++++++++ docs/concepts/fig/windows.svg | 193 +++++ docs/page/css/flink.css | 20 + 12 files changed, 4735 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7f41dd6a/docs/concepts/concepts.md ---------------------------------------------------------------------- diff --git a/docs/concepts/concepts.md b/docs/concepts/concepts.md new file mode 100644 index 0000000..1e8e247 --- /dev/null +++ b/docs/concepts/concepts.md @@ -0,0 +1,246 @@ +--- +title: "Concepts" +--- + + + + + +* This will be replaced by the TOC +{:toc} + + +## Programs and Dataflows + +The basic building blocks of Flink programs are **streams** and **transformations** (note that a DataSet is internally +also a stream). A *stream* is an intermediate result, and a *transformation* is an operation that takes one or more streams +as input, and computes one or more result streams from them. + +When executed, Flink programs are mapped to **streaming dataflows**, consisting of **streams** and transformation **operators**. +Each dataflow starts with one or more **sources** and ends in one or more **sinks**. The dataflows may resemble +arbitrary **directed acyclic graphs** *(DAGs)*. (Special forms of cycle is permitted via *iteration* constructs, we +omit this here for simplicity). + +In most cases, there is a one-to-one correspondence between the transformations in the programs and the operators +in the dataflow. Sometimes, however, one transformation may consist of multiple transformation operators. + +A DataStream program, and its dataflow. + + +**Parallel Dataflows** + +Programs in Flink are inherently parallel and distributed. *Streams* are split into **stream partitions** and +*operators* are split into **operator subtasks**. The operator subtasks execute independently from each other, +in different threads and on different machines or containers. + +The number of operator subtasks is the **parallelism** of that particular operator. The parallelism of a stream +is always that of its producing operator. Different operators of the program may have a different parallelism. + +A parallel dataflow + +Streams can transport data between two operators in a *one-to-one* (or *forwarding*) pattern, or in a *redistributing* pattern: + + - **One-to-one** streams (for example between the *source* and the *map()* operators) preserves partitioning and order of + elements. That means that subtask[1] of the *map()* operator will see the same elements in the same order, as they + were produced by subtask[1] of the *source* operator. + + - **Redistributing** streams (between *map()* and *keyBy/window*, as well as between *keyBy/window* and *sink*) change + the partitioning of streams. Each *stream partition* splits itself up and sends data to different target subtasks, + depending on the selected transformation. Examples are *keyBy()* (re-partitions by hash code), *broadcast()*, or + *rebalance()* (random redistribution). + In a *redistributing* exchange, order among elements is only preserved for each pair of sending- and receiving + task (for example subtask[1] of *map()* and subtask[2] of *keyBy/window*). + + +**Tasks & Operator Chains** + +For the distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread. +Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread +handover and buffering, and increases overall throughput while decreasing latency. +The chaining behavior can be configured in the APIs. + +The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads. + +Operator chaining into Tasks + + +## Distributed Execution + +**Master, Worker, Client** + +The Flink runtime consists of two types of processes: + + - The **master** processes (also called *JobManagers*) coordinate the distributed execution. They schedule tasks, coordinate + checkpoints, coordinate recovery on failures, etc. + + There is always at least one master process. A high-availability setup will have multiple master processes, out of + which one is always the *leader*, and the others are *standby*. + + - The **worker** processes (also called *TaskManagers*) execute the *tasks* (or more specifically, the subtasks) of a dataflow, + and buffer and exchange the data *streams*. + + There must always be at least one worker process. + +The master and worker processes can be started in an arbitrary fashion: Directly on the machines, via containers, or via +resource frameworks like YARN. Workers connect to masters, announcing themselves as available, and get work assigned. + +The **client** is not part of the runtime and program execution, but is used to prepare and send to dataflow to the master. +After that, the client can disconnect, or stay connected to receive progress reports. The client runs either as part of the +Java/Scala program that triggers the execution, or in the command line process `./bin/flink run ...`. + +The processes involved in executing a Flink dataflow + + +**Workers, Slots, Resources** + +Each worker (TaskManager) is a *JVM process*, and may execute one or more subtasks in separate threads. +To control how many tasks a worker accepts, a worker has so called **task slots** (at least one). + +Each *task slot* is a fix subset of resources of the TaskManager. A TaskManager with three slots, for example, +will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not +compete with subtasks from other jobs for managed memory, but that the subtask a certain amount of reserved +managed memory. Note that no CPU isolation happens here, slots currently only separate managed memory of tasks. + +Adjusting the number of task slots thus allows users to define how subtasks are isolated against each other. +Having one slot per TaskManager means each task group runs in a separate JVM (which can be started in a +separate container, for example). Having multiple slots +means more subtasks share the same JVM. Tasks in the same JVM share TCP connections (via multiplexing) and +heartbeats messages, or may shared data sets and data structures, thus reducing the per-task overhead. + +A TaskManager with Task Slots and Tasks + +By default, Flink allows subtasks to share slots, if they are subtasks of different tasks, but from the same +job. The result is that one slot may hold an entire pipeline of the job. Allowing this *slot sharing* +has two main benefits: + + - A Flink cluster needs exactly as many tasks slots, as the highest parallelism used in the job. + No need to calculate how many tasks (with varying parallelism) a program contains in total. + + - It is easier to get better resource utilization. Without slot sharing, the non-intensive + *source/map()* subtasks would block as many resources as the resource intensive *window* subtasks. + With slot sharing, increasing the base parallelism from two to six yields full utilization of the + slotted resources, while still making sure that each TaskManager gets only a fair share of the + heavy subtasks. + +The slot sharing behavior can be controlled in the APIs, to prevent sharing where it is undesirable. +The mechanism for that are the *resource groups*, which define what (sub)tasks may share slots. + +As a rule-of-thumb, a good default number of task slots would be the number of CPU cores. +With hyper threading, each slot then takes 2 or more hardware thread contexts. + +TaskManagers with shared Task Slots + + +## Time and Windows + +Aggregating events (e.g., counts, sums) work slightly differently on streams than in batch processing. +For example, it is impossible to first count all elements in the stream and then return the count, +because streams are in general infinite (unbounded). Instead, aggregates on streams (counts, sums, etc), +are scoped by **windows**, such as *"count over the last 5 minutes"*, or *"sum of the last 100 elements"*. + +Windows can be *time driven* (example: every 30 seconds) or *data driven* (example: every 100 elements). +One typically distinguishes different types of windows, such as *tumbling windows* (no overlap), +*sliding windows* (with overlap), and *session windows* (gap of activity). + +Time- and Count Windows + +More window examples can be found in this [blog post](https://flink.apache.org/news/2015/12/04/Introducing-windows.html). + + +**Time** + +When referring to time in a streaming program (for example to define windows), one can refer to different notions +of time: + + - **Event Time** is the time when an event was created. It is usually described by a timestamp in the events, + for example attached by the producing sensor, or the producing service. Flink accesses event timestamps + via [timestamp assigners]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html). + + - **Ingestion time** is the time when an event enters the Flink dataflow at the source operator. + + - **Processing Time** is the local time at each operator that performs a time-based operation. + +Event Time, Ingestion Time, and Processing Time + +More details on how to handle time are in the [event time docs]({{ site.baseurl }}/apis/streaming/event_time.html). + + +## State and Fault Tolerance + +While many operations in a dataflow simply look at one individual *event at a time* (for example an event parser), +some operations remember information across individual events (for example window operators). +These operations are called **stateful**. + +The state from stateful operation is maintained in what can be thought of as an embedded key/value store. +The state is partitioned and distributed strictly together with the streams that are read by the +stateful operators. Hence, access the key/value state is only possible on *keyed streams*, after a *keyBy()* function, +and is restricted to the values of the current event's key. Aligning the keys of streams and state +makes sure that all state updates are local operations, guaranteeing consistency without transaction overhead. +This alignment also allows Flink to redistribute the state and adjust the stream partitioning transparently. + +State and Partitioning + +**Checkpoints for Fault Tolerance** + +Flink implements fault tolerance using a combination of **stream replay** and **checkpoints**. A checkpoint +defines a consistent point in streams and state from which an streaming dataflow can resume, and maintain consistency +*(exactly-once processing semantics)*. The events and state update since the last checkpoint are replayed from the input streams. + +Checkpoints interval is a means of trading off the overhead of fault tolerance during execution, with the recovery time (the amount +of events that need to be replayed). + +More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{ site.baseurl }}/internals/stream_checkpointing.html/). + +checkpoints and snapshots + +**State backends** + +The exact data structures in which the key/values indexes are stored depend on the chosen **state backend**. One state backend +stores data in an in-memory hash map, another state backend uses [RocksDB](http://rocksdb.org) as the key/value index. +In addition to defining the data structure that holds the state, the state backends also implements the logic to +take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint. + + +## Batch on Streaming + +Flink executes batch programs as a special case of streaming programs, where the streams are bounded (finite number of elements). +A *DataSet* is treated internally as a stream of data. The concepts above thus apply to batch programs in the +same way as well as they apply to streaming programs, with minor exceptions: + + - Programs in the DataSet API do not use checkpoints. Recovery happens by fully replaying the streams. + That is possible, because inputs are bounded. This pushes the cost more towards the recovery, + but makes the regular processing cheaper, because it avoids checkpoints. + + - Stateful operation in the DataSet API use simplified in-memory/out-of-core data structures, rather than + key/value indexes. + + - The DataSet API introduces special synchronized (superstep-based) iterations, which are only possible on + bounded streams. For details, check out the [iteration docs]({{ site.baseurl }}/apis/batch/iterations.html). + http://git-wip-us.apache.org/repos/asf/flink/blob/7f41dd6a/docs/concepts/fig/checkpoints.svg ---------------------------------------------------------------------- diff --git a/docs/concepts/fig/checkpoints.svg b/docs/concepts/fig/checkpoints.svg new file mode 100644 index 0000000..c824296 --- /dev/null +++ b/docs/concepts/fig/checkpoints.svg @@ -0,0 +1,249 @@ + + + + + + + + + + image/svg+xml + + + + + + + + + Task + Manager + + Task + Manager + + Task + Manager + + Job + Manager + (master) + (workers) + + + + + + + + + + + + (snapshot store) + + + + store state + snapshots + + +