flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzulitai <...@git.apache.org>
Subject [GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Date Mon, 07 May 2018 07:16:07 GMT
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5922#discussion_r186341885
  
    --- Diff: docs/dev/stream/state/broadcast_state.md ---
    @@ -0,0 +1,279 @@
    +---
    +title: "The Broadcast State Pattern"
    +nav-parent_id: streaming_state
    +nav-pos: 2
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +* ToC
    +{:toc}
    +
    +[Working with State](state.html) describes operator state which upon restore is either
evenly distributed among the 
    +parallel tasks of an operator, or unioned, with the whole state being used to initialize
the restored parallel tasks.
    +
    +A third type of supported *operator state* is the *Broadcast State*. Broadcast state
was introduced to support use cases
    +where some data coming from one stream is required to be broadcasted to all downstream
tasks, where it is stored locally
    +and is used to process all incoming elements on the other stream. As an example where
broadcast state can emerge as a 
    +natural fit, one can imagine a low-throughput stream containing a set of rules which
we want to evaluate against all 
    +elements coming from another stream. Having the above type of use cases in mind, broadcast
state differs from the rest 
    +of operator states in that:
    + 1. it has a map format,
    + 2. it is only available to specific operators that have as inputs a *broadcasted* stream
and a *non-broadcasted* one, and
    + 3. such an operator can have *multiple broadcast states* with different names.
    +
    +## Provided APIs
    +
    +To show the provided APIs, we will start with an example before presenting their full
functionality. As our running 
    +example, we will use the case where we have a stream of objects of different colors and
shapes and we want to find pairs
    +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed
by a triangle. We assume that
    +the set of interesting patterns evolves over time. 
    +
    +In this example, the first stream will contain elements of type `Item` with a `Color`
and a `Shape` property. The other
    +stream will contain the `Rules`.
    +
    +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want
pairs of the same color. This will
    +make sure that elements of the same color end up on the same physical machine.
    +
    +{% highlight java %}
    +// key the shapes by color
    +KeyedStream<Item, Color> colorPartitionedStream = shapeStream
    +                        .keyBy(new KeySelector<Shape, Color>(){...});
    +{% endhighlight %}
    +
    +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream
tasks, and these tasks 
    +should store them locally so that they can evaluate them against all incoming `Items`.
The snippet below will i) broadcast 
    +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the
broadcast state where the rules
    +will be stored.
    +
    +{% highlight java %}
    +
    +// a map descriptor to store the name of the rule (string) and the rule itself.
    +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
    +				"RulesBroadcastState",
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				TypeInformation.of(new TypeHint<Rule>() {})
    +		);
    +		
    +// broadcast the rules and create the broadcast state
    +BroadcastStream<Rule> ruleBroadcastStream = ruleStream
    +                        .broadcast(ruleStateDescriptor);
    +{% endhighlight %}
    +
    +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item`
stream, we need to:
    +    1) connect the two streams and 
    +    2) specify our match detecting logic. 
    +
    +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling
`connect()` on the 
    +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a
`BroadcastConnectedStream`, on 
    +which we can call `process()` with a special type of `CoProcessFunction`. The function
will contain our matching logic. 
    +The exact type of the function depends on the type of the non-broadcasted stream: 
    + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. 
    + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
    + 
    + Given that our non-broadcasted stream is keyed, the following snippet includes the above
calls:
    +
    +<div class="alert alert-info">
    +  <strong>Attention:</strong> The connect should be called on the non-broadcasted
stream, with the `BroadcastStream`
    +   as an argument.
    +</div>
    +
    +{% highlight java %}
    +DataStream<Match> output = colorPartitionedStream
    +                 .connect(ruleBroadcastStream)
    +                 .process(
    +                     
    +                     // type arguments in our KeyedBroadcastProcessFunction represent:

    +                     //   1. the key of the keyed stream
    +                     //   2. the type of elements in the non-broadcast side
    +                     //   3. the type of elements in the broadcast side
    +                     //   4. the type of the result, here a string
    +                     
    +                     new KeyedBroadcastProcessFunction<Color, Item, Rule, String>()
{
    +                         // my matching logic
    +                     }
    +                 )
    +{% endhighlight %}
    +
    +### BroadcastProcessFunction and KeyedBroadcastProcessFunction
    +
    +As in the case of a `CoProcessFunction`, these methods have two "sides", one is responsible
for processing incoming 
    --- End diff --
    
    maybe we can just say -
    
    As in the case of a `CoProcessFunction`, these functions have two process methods to implement;
one is ......
    IMO, its more direct, and more formal. What do you think?


---

Mime
View raw message