flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alpinegizmo <...@git.apache.org>
Subject [GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...
Date Wed, 18 Jan 2017 13:16:06 GMT
Github user alpinegizmo commented on a diff in the pull request:

    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
    -### State API
    +### Introduction
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and
the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through
the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
    +The migration process will serve two goals:
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2,
such as rescaling,
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1
predecessor stopped.
    +As running examples for the remainder of this document we will use the `CountMapper`
and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, while 
    +the second has **non-keyed** state. The code for the aforementioned two functions in
Flink-1.1 is presented below:
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>,
Tuple2<String, Integer>> {
    +        private transient ValueState<Integer> counter;
    +        private final int numberElements;
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String,
Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +      	    counter.update(count);
    +      	    if (count % numberElements == 0) {
    +     		    out.collect(Tuple2.of(value.f0, count));
    +     		    counter.update(0); // reset to 0
    +     	    }
    +        }
    +    }
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,

    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +	    private final int threshold;
    +	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +	    BufferingSink(int threshold) {
    +		    this.threshold = threshold;
    +		    this.bufferedElements = new ArrayList<>();
    +	    }
    +    	@Override
    +	    public void invoke(Tuple2<String, Integer> value) throws Exception {
    +		    bufferedElements.add(value);
    +		    if (bufferedElements.size() == threshold) {
    +			    for (Tuple2<String, Integer> element: bufferedElements) {
    +				    // send it to the sink
    +			    }
    +			    bufferedElements.clear();
    +		    }
    +	    }
    +	    @Override
    +	    public ArrayList<Tuple2<String, Integer>> snapshotState(
    +	            long checkpointId, long checkpointTimestamp) throws Exception {
    +		    return bufferedElements;
    +	    }
    +	    @Override
    +	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws
Exception {
    +	    	bufferedElements.addAll(state);
    +        }
    +    }
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key input stream
of the form 
    +`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer>
counter`) and if 
    +the number of occurrences of a certain word surpasses the user-provided threshold, a
tuple is emitted 
    +containing the word itself and the number of occurrences.
    +The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output
of the `CountMapper`)
    +and buffers them until a certain user-specified threshold is reached, before emitting
them to the final sink. 
    +This is a common way to avoid many expensive calls to a database or an external storage
system. To do the 
    +buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`)
which is 
    +periodically checkpointed.
    +### Migration to Flink-1.2
    +To leverage the new features of Flink-1.2, the code above should be modified to use the
new state abstractions. 
    +After doing these changes, you will be able to change the parallelism of your job (scale
up or down) and you 
    +are guaranteed that the new version of your job will start from where its predecessor
left off.
    +**Keyed State:** Something to note before delving into the details of the migration process
is that if your function 
    +has **only keyed state**, then the exact same code from Flink-1.1 also works for Flink-1.2
with full support 
    +for the new features and full backwards compatibility. Changes could be made just for
better code organization, 
    +but this is just a matter of style.
    +With the above said, the rest of the paragraph focuses on the **non-keyed state**.
    +#### Rescaling and new state abstractions
    +The first modification is the transition from the old `Checkpointed<T extends Serializable>`
state interface 
    +to the new ones. In Flink-1.2, a stateful function can implement either the more general
    +interface, or the `ListCheckpointed<T extends Serializable>`, which is semantically
closer to the old 
    +`Checkpointed` one.
    +In both cases, the non-keyed state is expected to be a `List` of *serializable* objects,
independent from each other, 
    +thus eligible for redistribution upon rescaling. In other words, these objects are the
finest granularity at which 
    +non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed
state of the `BufferingSink`
    +contains elements `(test1, 2)` and `(test2, 2)` when increasing the parallelism to 2,
`(test1, 2)` may end up in task 0,
    +while `(test2, 2)` will go to task 1. 
    +More details on the principles behind rescaling of both keyed state and non-keyed state
is done can be found 
    +[here](link here).
    +##### ListCheckpointed
    +The `ListCheckpointed` interface requires the implementation of two methods: 
    +    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    +    void restoreState(List<T> state) throws Exception;
    +Their semantics are the same as their counterparts in the old `Checkpointed` interface.
The only difference 
    +is that now the `snapshotState()` should return a list of objects to checkpoint, as stated
earlier, and the 
    +`restoreState` has to handle this list upon recovery. If the state is not re-partitionable,
you can always 
    +return a `Collections.singletonList(MY_STATE)` in the `snaphshotState()`. The updated
code for the `BufferingSink` 
    +is included below:
    +    public class BufferingSinkListCheckpointed implements 
    +            SinkFunction<Tuple2<String, Integer>>,
    +            ListCheckpointed<Tuple2<String, Integer>>, 
    +            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>>
    +        private final int threashold;
    --- End diff --
    threshold is frequently misspelled in this example

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.

View raw message