flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-5456] Resurrect and update parts of the state intro documentation
Date Mon, 30 Jan 2017 16:59:42 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.2 c365a34b8 -> 65b1da8c5

[FLINK-5456] Resurrect and update parts of the state intro documentation

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0666786a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0666786a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0666786a

Branch: refs/heads/release-1.2
Commit: 0666786ad87d3befd248cf7b8c59e686ac29af8e
Parents: c365a34
Author: David Anderson <david@alpinegizmo.com>
Authored: Wed Jan 18 15:56:18 2017 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Jan 30 17:59:14 2017 +0100

 docs/dev/stream/state.md | 332 +++++++++++++++++++++++++++++++++++++++---
 1 file changed, 314 insertions(+), 18 deletions(-)

diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 0b38a62..124ce68 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -26,11 +26,11 @@ under the License.
 Stateful functions and operators store data across the processing of individual elements/events,
making state a critical building block for
-any type of more elaborate operation. For example: 
+any type of more elaborate operation. For example:
   - When an application searches for certain event patterns, the state will store the sequence
of events encountered so far.
   - When aggregating events per minute, the state holds the pending aggregates.
-  - When training a machine learning model over a stream of data points, the state holds
the current verstion of the model parameters.
+  - When training a machine learning model over a stream of data points, the state holds
the current version of the model parameters.
 In order to make state fault tolerant, Flink needs to be aware of the state and [checkpoint](checkpointing.html)
 In many cases, Flink can also *manage* the state for the application, meaning Flink deals
with the memory management (possibly spilling to disk
@@ -39,40 +39,336 @@ if necessary) to allow applications to hold very large state.
 This document explains how to use Flink's state abstractions when developing an application.
-## Keyed State and Operator state
+## Keyed State and Operator State
-There are two basic state backends: `Keyed State` and `Operator State`.
+There are two basic kinds of state in Flink: `Keyed State` and `Operator State`.
-#### Keyed State
+### Keyed State
 *Keyed State* is always relative to keys and can only be used in functions and operators
on a `KeyedStream`.
-Examples of keyed state are the `ValueState` or `ListState` that one can create in a function
on a `KeyedStream`, as
-well as the state of a keyed window operator.
-Keyed State is organized in so called *Key Groups*. Key Groups are the unit by which keyed
state can be redistributed and
-there are as many key groups as the defined maximum parallelism.
-During execution each parallel instance of an operator gets one or more key groups.
+You can think of Keyed State as Operator State that has been partitioned,
+or sharded, with exactly one state-partition per key.
+Each keyed-state is logically bound to a unique
+composite of <parallel-operator-instance, key>, and since each key
+"belongs" to exactly one parallel instance of a keyed operator, we can
+think of this simply as <operator, key>.
-#### Operator State
+Keyed State is further organized into so-called *Key Groups*. Key Groups are the
+atomic unit by which Flink can redistribute Keyed State;
+there are exactly as many Key Groups as the defined maximum parallelism.
+During execution each parallel instance of a keyed operator works with the keys
+for one or more Key Groups.
-*Operator State* is state per parallel subtask. It subsumes the `Checkpointed` interface
in Flink 1.0 and Flink 1.1.
-The new `CheckpointedFunction` interface is basically a shortcut (syntactic sugar) for the
Operator State.
+### Operator State
-Operator State needs special re-distribution schemes when parallelism is changed. There can
be different variations of such
-schemes; the following are currently defined:
+With *Operator State* (or *non-keyed state*), each operator state is
+bound to one parallel operator instance.
+The Kafka source connector is a good motivating example for the use of Operator State
+in Flink. Each parallel instance of this Kafka consumer maintains a map
+of topic partitions and offsets as its Operator State.
+The Operator State interfaces support redistributing state among
+parallel operator instances when the parallelism is changed. There can be different schemes
for doing this redistribution; the following are currently defined:
   - **List-style redistribution:** Each operator returns a List of state elements. The whole
state is logically a concatenation of
     all lists. On restore/redistribution, the list is evenly divided into as many sublists
as there are parallel operators.
     Each operator gets a sublist, which can be empty, or contain one or more elements.
 ## Raw and Managed State
 *Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
 *Managed State* is represented in data structures controlled by the Flink runtime, such as
internal hash tables, or RocksDB.
-Examples are "ValueState", "ListState", etc. Flink's runtime encodes the states and writes
them into the checkpoints.
+Examples are "ValueState", "ListState", etc. Flink's runtime encodes
+the states and writes them into the checkpoints.
-*Raw State* is state that users and operators keep in their own data structures. When checkpointed,
they only write a sequence of bytes into
+*Raw State* is state that operators keep in their own data structures. When checkpointed,
they only write a sequence of bytes into
 the checkpoint. Flink knows nothing about the state's data structures and sees only the raw
+All datastream functions can use managed state, but the raw state interfaces can only be
used when implementing operators.
+Using managed state (rather than raw state) is recommended, since with
+managed state Flink is able to automatically redistribute state when the parallelism is
+changed, and also do better memory management.
+## Using Managed Keyed State
+The managed keyed state interface provides access to different types of state that are all
scoped to
+the key of the current input element. This means that this type of state can only be used
+on a `KeyedStream`, which can be created via `stream.keyBy(…)`.
+Now, we will first look at the different types of state available and then we will see
+how they can be used in a program. The available state primitives are:
+* `ValueState<T>`: This keeps a value that can be updated and
+retrieved (scoped to key of the input element as mentioned above, so there will possibly
be one value
+for each key that the operation sees). The value can be set using `update(T)` and retrieved
+`T value()`.
+* `ListState<T>`: This keeps a list of elements. You can append elements and retrieve
an `Iterable`
+over all currently stored elements. Elements are added using `add(T)`, the Iterable can
+be retrieved using `Iterable<T> get()`.
+* `ReducingState<T>`: This keeps a single value that represents the aggregation of
all values
+added to the state. The interface is the same as for `ListState` but elements added using
+`add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
+All types of state also have a method `clear()` that clears the state for the currently
+active key, i.e. the key of the input element.
+It is important to keep in mind that these state objects are only used for interfacing
+with state. The state is not necessarily stored inside but might reside on disk or somewhere
+The second thing to keep in mind is that the value you get from the state
+depends on the key of the input element. So the value you get in one invocation of your
+user function can differ from the value in another invocation if the keys involved are different.
+To get a state handle, you have to create a `StateDescriptor`. This holds the name of the
+(as we will see later, you can create several states, and they have to have unique names
+that you can reference them), the type of the values that the state holds, and possibly
+a user-specified function, such as a `ReduceFunction`. Depending on what type of state you
+want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor` or
+a `ReducingStateDescriptor`.
+State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*.
+Please see [here]({{ site.baseurl }}/dev/api_concepts#rich-functions) for
+information about that, but we will also see an example shortly. The `RuntimeContext` that
+is available in a `RichFunction` has these methods for accessing state:
+* `ValueState<T> getState(ValueStateDescriptor<T>)`
+* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
+* `ListState<T> getListState(ListStateDescriptor<T>)`
+This is an example `FlatMapFunction` that shows how all of the parts fit together:
+{% highlight java %}
+public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>,
Tuple2<Long, Long>> {
+    /**
+     * The ValueState handle. The first field is the count, the second field a running sum.
+     */
+    private transient ValueState<Tuple2<Long, Long>> sum;
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>>
out) throws Exception {
+        // access the state value
+        Tuple2<Long, Long> currentSum = sum.value();
+        // update the count
+        currentSum.f0 += 1;
+        // add the second field of the input value
+        currentSum.f1 += input.f1;
+        // update the state
+        sum.update(currentSum);
+        // if the count reaches 2, emit the average and clear the state
+        if (currentSum.f0 >= 2) {
+            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
+            sum.clear();
+        }
+    }
+    @Override
+    public void open(Configuration config) {
+        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+                new ValueStateDescriptor<>(
+                        "average", // the state name
+                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>()
{}), // type information
+                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was
+        sum = getRuntimeContext().getState(descriptor);
+    }
+// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment
+env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L),
Tuple2.of(1L, 2L))
+        .keyBy(0)
+        .flatMap(new CountWindowAverage())
+        .print();
+// the printed output will be (1,4) and (1,5)
+{% endhighlight %}
+This example implements a poor man's counting window. We key the tuples by the first field
+(in the example all have the same key `1`). The function stores the count and a running sum
+a `ValueState`. Once the count reaches 2 it will emit the average and clear the state so
+we start over from `0`. Note that this would keep a different state value for each different
+key if we had tuples with different values in the first field.
+### State in the Scala DataStream API
+In addition to the interface described above, the Scala API has shortcuts for stateful
+`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. The user function
+gets the current value of the `ValueState` in an `Option` and must return an updated value
+will be used to update the state.
+{% highlight scala %}
+val stream: DataStream[(String, Int)] = ...
+val counts: DataStream[(String, Int)] = stream
+  .keyBy(_._1)
+  .mapWithState((in: (String, Int), count: Option[Int]) =>
+    count match {
+      case Some(c) => ( (in._1, c), Some(c + in._2) )
+      case None => ( (in._1, 0), Some(in._2) )
+    })
+{% endhighlight %}
+## Using Managed Operator State
+A stateful function can implement either the more general `CheckpointedFunction`
+interface, or the `ListCheckpointed<T extends Serializable>` interface.
+In both cases, the non-keyed state is expected to be a `List` of *serializable* objects,
independent from each other,
+thus eligible for redistribution upon rescaling. In other words, these objects are the finest
granularity at which
+non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed
state of the `BufferingSink`
+contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1,
2)` may end up in task 0,
+while `(test2, 2)` will go to task 1.
+##### ListCheckpointed
+The `ListCheckpointed` interface requires the implementation of two methods:
+    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+    void restoreState(List<T> state) throws Exception;
+On `snapshotState()` the operator should return a list of objects to checkpoint and
+`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable,
you can always
+return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
+##### CheckpointedFunction
+The `CheckpointedFunction` interface also requires the implementation of two methods:
+    void snapshotState(FunctionSnapshotContext context) throws Exception;
+    void initializeState(FunctionInitializationContext context) throws Exception;
+Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`,
is called every time the user-defined function is initialized, be that when the function is
first initialized
+or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()`
is not
+only the place where different types of state are initialized, but also where state recovery
logic is included.
+This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction`
+uses state to buffer elements before sending them to the outside world:
+    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+            CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String,
Integer>>> {
+        private final int threshold;
+        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+        private List<Tuple2<String, Integer>> bufferedElements;
+        public BufferingSink(int threshold) {
+            this.threshold = threshold;
+            this.bufferedElements = new ArrayList<>();
+        }
+        @Override
+        public void invoke(Tuple2<String, Integer> value) throws Exception {
+            bufferedElements.add(value);
+            if (bufferedElements.size() == threshold) {
+                for (Tuple2<String, Integer> element: bufferedElements) {
+                    // send it to the sink
+                }
+                bufferedElements.clear();
+            }
+        }
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            checkpointedState.clear();
+            for (Tuple2<String, Integer> element : bufferedElements) {
+                checkpointedState.add(element);
+            }
+        }
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception
+            checkpointedState = context.getOperatorStateStore().
+                getSerializableListState("buffered-elements");
+            if (context.isRestored()) {
+                for (Tuple2<String, Integer> element : checkpointedState.get()) {
+                    bufferedElements.add(element);
+                }
+            }
+        }
+        @Override
+        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws
Exception {
+            // this is from the CheckpointedRestoring interface.
+            this.bufferedElements.addAll(state);
+        }
+    }
+The `initializeState` method takes as argument a `FunctionInitializationContext`. This is
used to initialize
+the non-keyed state "containers". These are a container of type `ListState` where the non-keyed
state objects
+are going to be stored upon checkpointing.
+`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");`
+After initializing the container, we use the `isRestored()` method of the context to check
if we are
+recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic
is applied.
+As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state
+initialization is kept in a class variable for future use in `snapshotState()`. There the
`ListState` is cleared
+of all objects included by the previous checkpoint, and is then filled with the new ones
we want to checkpoint.
+As a side note, the keyed state can also be initialized in the `initializeState()` method.
This can be done
+using the provided `FunctionInitializationContext`.
+### Stateful Source Functions
+Stateful sources require a bit more care as opposed to other operators.
+In order to make the updates to the state and output collection atomic (required for exactly-once
+on failure/recovery), the user is required to get a lock from the source's context.
+{% highlight java %}
+public static class CounterSource
+        extends RichParallelSourceFunction<Long>
+        implements ListCheckpointed<Long> {
+    /**  current offset for exactly once semantics */
+    private Long offset;
+    /** flag for job cancellation */
+    private volatile boolean isRunning = true;
+    @Override
+    public void run(SourceContext<Long> ctx) {
+        final Object lock = ctx.getCheckpointLock();
+        while (isRunning) {
+            // output and state update are atomic
+            synchronized (lock) {
+                ctx.collect(offset);
+                offset += 1;
+            }
+        }
+    }
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
+    @Override
+    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
+        return Collections.singletonList(offset);
+    }
+    @Override
+    public void restoreState(List<Long> state) {
+        for (Long s : state)
+            offset = s;
+    }
+{% endhighlight %}
+Some operators might need the information when a checkpoint is fully acknowledged by Flink
to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener`

View raw message