flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3899) Document window processing with Reduce/FoldFunction + WindowFunction
Date Tue, 23 Aug 2016 23:52:21 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433883#comment-15433883
] 

ASF GitHub Bot commented on FLINK-3899:
---------------------------------------

Github user danielblazevski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r75973261
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental
window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract
the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long>
{
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String,
TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps,
Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    --- End diff --
    
    @fhueske does this look OK for this case?  If so, I'll finish things up by adding the
Reduce example and add both corresponding Scala examples
    ```java
    // for folding incremental computation
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .apply(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new
MyWindowFunction())
    
    /* ... */
    
    private static class MyFoldFunction implements FoldFunction<SensorReading,
            Tuple3<String, Long, Integer> > {
    
        public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer>
acc, SensorReading s) {
            Integer cur = acc.getField(2);
            return new Tuple3<String, Long, Integer> (acc.getField(0), acc.getField(1),
cur + 1);
        }
    }
    
    private static class MyWindowFunction implements WindowFunction<Tuple3<String, Long,
Integer>,
            Tuple3<String, Long, Integer>, String, TimeWindow> {
        public void apply(String s,
                          TimeWindow window,
                          Iterable<Tuple3<String, Long, Integer>> counts,
                          Collector<Tuple3<String, Long, Integer>> out) {
            out.collect(new Tuple3<String, Long, Integer>(s, window.getEnd(),
                    counts.iterator().next().getField(2));
        }
    }
    ```
    
    I found that I had to have the `FoldFunction` include `Tuple3` in its signature since
the `WindowFunction` must be of the form `WindowFunction<ACC, ACC, K, W>` according
to [here](https://github.com/apache/flink/blob/b8299bf92d8e3dbe140dd89602699394019b783d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java)


> Document window processing with Reduce/FoldFunction + WindowFunction
> --------------------------------------------------------------------
>
>                 Key: FLINK-3899
>                 URL: https://issues.apache.org/jira/browse/FLINK-3899
>             Project: Flink
>          Issue Type: Improvement
>          Components: Documentation, Streaming
>    Affects Versions: 1.1.0
>            Reporter: Fabian Hueske
>
> The streaming documentation does not describe how windows can be processed with FoldFunction
or ReduceFunction and a subsequent WindowFunction. This combination allows for eager window
aggregation (only a single element is kept in the window) and access of the Window object,
e.g., to have access to the window's start and end time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message