flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [15/15] flink git commit: [FLINK-6848] [doc] Update managed state docs to include Scala snippets
Date Tue, 13 Jun 2017 05:49:12 GMT
[FLINK-6848] [doc] Update managed state docs to include Scala snippets

Add an example of how to work with managed state in Scala.

This closes #4072.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db975260
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db975260
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db975260

Branch: refs/heads/release-1.3
Commit: db975260cfba8b089d4f477a0ebfc011d0bda130
Parents: 1bdd19d
Author: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Authored: Sun Jun 4 16:08:44 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Jun 13 07:48:33 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/state.md | 212 ++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 203 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db975260/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 0025fae..65d0d75 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -115,7 +115,7 @@ of elements that are added to the state. The interface is the same as
for `ListS
 added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.
 
 * `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into
the state and
-retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK,
UV)` or 
+retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK,
UV)` or
 `putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using
`get(UK)`. The iterable
 views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()`
respectively.
 
@@ -152,6 +152,8 @@ is available in a `RichFunction` has these methods for accessing state:
 
 This is an example `FlatMapFunction` that shows how all of the parts fit together:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>,
Tuple2<Long, Long>> {
 
@@ -201,6 +203,66 @@ env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L,
7L), Tuple2
 
 // the printed output will be (1,4) and (1,5)
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
+
+  private var sum: ValueState[(Long, Long)] = _
+
+  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
+
+    // access the state value
+    val tmpCurrentSum = sum.value
+
+    // If it hasn't been used before, it will be null
+    val currentSum = if (tmpCurrentSum != null) {
+      tmpCurrentSum
+    } else {
+      (0L, 0L)
+    }
+
+    // update the count
+    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
+
+    // update the state
+    sum.update(newSum)
+
+    // if the count reaches 2, emit the average and clear the state
+    if (newSum._1 >= 2) {
+      out.collect((input._1, newSum._2 / newSum._1))
+      sum.clear()
+    }
+  }
+
+  override def open(parameters: Configuration): Unit = {
+    sum = getRuntimeContext.getState(
+      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
+    )
+  }
+}
+
+
+object ExampleCountWindowAverage extends App {
+  val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+  env.fromCollection(List(
+    (1L, 3L),
+    (1L, 5L),
+    (1L, 7L),
+    (1L, 4L),
+    (1L, 2L)
+  )).keyBy(_._1)
+    .flatMap(new CountWindowAverage())
+    .print()
+  // the printed output will be (1,4) and (1,5)
+
+  env.execute("ExampleManagedState")
+}
+{% endhighlight %}
+</div>
+</div>
 
 This example implements a poor man's counting window. We key the tuples by the first field
 (in the example all have the same key `1`). The function stores the count and a running sum
in
@@ -268,6 +330,8 @@ Below is an example of a stateful `SinkFunction` that uses `CheckpointedFunction
 to buffer elements before sending them to the outside world. It demonstrates
 the basic even-split redistribution list state:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 public class BufferingSink
         implements SinkFunction<Tuple2<String, Integer>>,
@@ -311,7 +375,7 @@ public class BufferingSink
                 "buffered-elements",
                 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
                 Tuple2.of(0L, 0L));
-                
+
         checkpointedState = context.getOperatorStateStore().getListState(descriptor);
 
         if (context.isRestored()) {
@@ -328,6 +392,59 @@ public class BufferingSink
     }
 }
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class BufferingSink(threshold: Int = 0)
+  extends SinkFunction[(String, Int)]
+    with CheckpointedFunction
+    with CheckpointedRestoring[List[(String, Int)]] {
+
+  @transient
+  private var checkpointedState: ListState[(String, Int)] = null
+
+  private val bufferedElements = ListBuffer[(String, Int)]()
+
+  override def invoke(value: (String, Int)): Unit = {
+    bufferedElements += value
+    if (bufferedElements.size == threshold) {
+      for (element <- bufferedElements) {
+        // send it to the sink
+      }
+      bufferedElements.clear()
+    }
+  }
+
+  override def snapshotState(context: FunctionSnapshotContext): Unit = {
+    checkpointedState.clear()
+    for (element <- bufferedElements) {
+      checkpointedState.add(element)
+    }
+  }
+
+  override def initializeState(context: FunctionInitializationContext): Unit = {
+    val descriptor = new ListStateDescriptor[(String, Int)](
+      "buffered-elements",
+      TypeInformation.of(new TypeHint[(String, Int)]() {})
+    )
+
+    checkpointedState = context.getOperatorStateStore.getListState(descriptor)
+
+    if(context.isRestored) {
+      for(element <- checkpointedState.get()) {
+        bufferedElements += element
+      }
+    }
+  }
+
+  override def restoreState(state: List[(String, Int)]): Unit = {
+    bufferedElements ++= state
+  }
+}
+{% endhighlight %}
+</div>
+</div>
 
 The `initializeState` method takes as argument a `FunctionInitializationContext`. This is
used to initialize
 the non-keyed state "containers". These are a container of type `ListState` where the non-keyed
state objects
@@ -337,16 +454,32 @@ Note how the state is initialized, similar to keyed state,
 with a `StateDescriptor` that contains the state name and information
 about the type of the value that the state holds:
 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 ListStateDescriptor<Tuple2<String, Integer>> descriptor =
     new ListStateDescriptor<>(
         "buffered-elements",
-        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
-        Tuple2.of(0L, 0L));
+        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
 
 checkpointedState = context.getOperatorStateStore().getListState(descriptor);
 {% endhighlight %}
 
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val descriptor = new ListStateDescriptor[(String, Long)](
+    "buffered-elements",
+    TypeInformation.of(new TypeHint[(String, Long)]() {})
+)
+
+checkpointedState = context.getOperatorStateStore.getListState(descriptor)
+
+{% endhighlight %}
+</div>
+</div>
 The naming convention of the state access methods contain its redistribution
 pattern followed by its state structure. For example, to use list state with the
 union redistribution scheme on restore, access the state by using `getUnionListState(descriptor)`.
@@ -385,6 +518,8 @@ Stateful sources require a bit more care as opposed to other operators.
 In order to make the updates to the state and output collection atomic (required for exactly-once
semantics
 on failure/recovery), the user is required to get a lock from the source's context.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 public static class CounterSource
         extends RichParallelSourceFunction<Long>
@@ -426,6 +561,46 @@ public static class CounterSource
     }
 }
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CounterSource
+       extends RichParallelSourceFunction[Long]
+       with ListCheckpointed[Long] {
+
+  @volatile
+  private var isRunning = true
+
+  private var offset = 0L
+
+  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
+    val lock = ctx.getCheckpointLock
+
+    while (isRunning) {
+      // output and state update are atomic
+      lock.synchronized({
+        ctx.collect(offset)
+
+        offset += 1
+      })
+    }
+  }
+
+  override def cancel(): Unit = isRunning = false
+
+  override def restoreState(state: util.List[Long]): Unit =
+    for (s <- state) {
+      offset = s
+    }
+
+  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
+    Collections.singletonList(offset)
+
+}
+{% endhighlight %}
+</div>
+</div>
 
 Some operators might need the information when a checkpoint is fully acknowledged by Flink
to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener`
interface.
 
@@ -433,7 +608,7 @@ Some operators might need the information when a checkpoint is fully acknowledge
 
 This section is targeted as a guideline for users who require the use of custom serialization
for their state, covering how
 to provide a custom serializer and how to handle upgrades to the serializer for compatibility.
If you're simply using
-Flink's own serializers, this section is irrelevant and can be skipped. 
+Flink's own serializers, this section is irrelevant and can be skipped.
 
 ### Using custom serializers
 
@@ -444,14 +619,33 @@ to specify the state's name, as well as information about the type of
the state.
 It is also possible to completely bypass this and let Flink use your own custom serializer
to serialize managed states,
 simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
+public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>>
{...};
+
 ListStateDescriptor<Tuple2<String, Integer>> descriptor =
     new ListStateDescriptor<>(
         "state-name",
-        new TypeSerializer<> {...});
+        new CustomTypeSerializer());
 
 checkpointedState = getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}
+
+val descriptor = new ListStateDescriptor[(String, Integer)](
+    "state-name",
+    new CustomTypeSerializer)
+)
+
+checkpointedState = getRuntimeContext.getListState(descriptor);
+{% endhighlight %}
+</div>
+</div>
 
 Note that Flink writes state serializers along with the state as metadata. In certain cases
on restore (see following
 subsections), the written serializer needs to be deserialized and used. Therefore, it is
recommended to avoid using
@@ -542,7 +736,7 @@ The above cases can be translated to code by returning one of the following
from
 
   * **`CompatibilityResult.compatible()`**: This acknowledges that the new serializer is
compatible, or has been reconfigured to
     be compatible, and Flink can proceed with the job with the serializer as is.
-    
+
   * **`CompatibilityResult.requiresMigration()`**: This acknowledges that the serializer
is incompatible, or cannot be
     reconfigured to be compatible, and requires a state migration before the new serializer
can be used. State migration
     is performed by using the previous serializer to read the restored state bytes to objects,
and then serialized again
@@ -551,7 +745,7 @@ The above cases can be translated to code by returning one of the following
from
   * **`CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`**: This acknowledgement
has equivalent semantics
     to `CompatibilityResult.requiresMigration()`, but in the case that the previous serializer
cannot be found or loaded
     to read the restored state bytes for the migration, a provided `TypeDeserializer` can
be used as a fallback resort.
-  
+
 <span class="label label-danger">Attention</span> Currently, as of Flink 1.3,
if the result of the compatibility check
 acknowledges that state migration needs to be performed, the job simply fails to restore
from the checkpoint as state
 migration is currently not available. The ability to migrate state will be introduced in
future releases.
@@ -560,7 +754,7 @@ migration is currently not available. The ability to migrate state will
be intro
 
 Since `TypeSerializer`s and `TypeSerializerConfigSnapshot`s are written as part of checkpoints
along with the state
 values, the availability of the classes within the classpath may affect restore behaviour.
- 
+
 `TypeSerializer`s are directly written into checkpoints using Java Object Serialization.
In the case that the new
 serializer acknowledges that it is incompatible and requires state migration, it will be
required to be present to be
 able to read the restored state bytes. Therefore, if the original serializer class no longer
exists or has been modified


Mime
View raw message