flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Harshvardhan Agrawal <harshvardhan.ag...@gmail.com>
Subject Re: Behaviour of triggers in Flink
Date Mon, 23 Jul 2018 14:28:36 GMT
Thanks for the response Hequn. I also see a weird behavior with purging
trigger. It skips messages.

Here is the repro:

public class WindowTest {
    public static void main (String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        DataStreamSource<Long> source = env.addSource(new
SourceFunction<Long>() {

            @Override
            public void run(SourceContext<Long> ctx) throws Exception {
                LongStream.range(0, 101).forEach(ctx::collect);
            }

            @Override
            public void cancel() {

            }
        });

        source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(7))).apply(new
AllWindowFunction<Long, Object, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<Long>
values, Collector<Object> collector) throws Exception {
                System.out.println("processing a window");
                System.out.println(Joiner.on(',').join(values));
            }
        }).print();

        env.execute("test-program");

    }
}



processing a window
0,1,2,3,4,5,6
processing a window
10,11,12,13,14,15,16
processing a window
17,18,19,20,21,22,23
processing a window
24,25,26,27,28,29,30
processing a window
31,32,33,34,35,36,37
processing a window
38,39,40,41,42,43,44
processing a window
45,46,47,48,49,50,51
processing a window
52,53,54,55,56,57,58
processing a window
59,60,61,62,63,64,65
processing a window
66,67,68,69,70,71,72
processing a window
73,74,75,76,77,78,79
processing a window
80,81,82,83,84,85,86
processing a window
87,88,89,90,91,92,93
processing a window
94,95,96,97,98,99,100

It has skipped numbers 7-9. Is this expected behavior?

On Sun, Jul 22, 2018 at 9:43 PM Hequn Cheng <chenghequn@gmail.com> wrote:

> Hi Harshvardhan,
>
> By specifying a trigger using trigger() you are overwriting the default
> trigger of a WindowAssigner. For example, if you specify a CountTrigger for
> TumblingEventTimeWindows you will no longer get window firings based on the
> progress of time but only by count. Right now, you have to write your own
> custom trigger if you want to react based on both time and count.
> More details here[1].
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#default-triggers-of-windowassigners
>
> On Sun, Jul 22, 2018 at 11:59 PM, Harshvardhan Agrawal <
> harshvardhan.agr93@gmail.com> wrote:
>
>> Hi,
>>
>> I have been trying to understand how triggers work in Flink. We have a
>> set of data that arrives to us on Kafka. We need to process the data in a
>> window when either one of the two criteria satisfy:
>> 1) Max number of elements has reached in the window.
>> 2) Some max time has elapsed (Say 5 milliseconds in our case).
>>
>> I have written the following code:
>>
>> public class WindowTest {
>>     public static void main (String[] args) throws Exception {
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment();
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>         DataStreamSource<Long> source = env.addSource(new
>> SourceFunction<Long>() {
>>
>>             @Override
>>             public void run(SourceContext<Long> ctx) throws Exception {
>>                 LongStream.range(0, 102).forEach(ctx::collect);
>>             }
>>
>>             @Override
>>             public void cancel() {
>>
>>             }
>>         });
>>
>>
>> source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(15))).apply(new
>> AllWindowFunction<Long, Object, TimeWindow>() {
>>             @Override
>>             public void apply(TimeWindow timeWindow, Iterable<Long>
>> values, Collector<Object> collector) throws Exception {
>>                 System.out.println("processing a window");
>>                 System.out.println(Joiner.on(',').join(values));
>>             }
>>         }).print();
>>
>>         env.execute("test-program");
>>
>>     }
>> }
>>
>> Here is the output I get when I run this code:
>>
>> processing a window
>> 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
>> processing a window
>> 15,16,17,18,19,20,21,22,23,24,25,26,27,28,29
>> processing a window
>> 30,31,32,33,34,35,36,37,38,39,40,41,42,43,44
>> processing a window
>> 45,46,47,48,49,50,51,52,53,54,55,56,57,58,59
>> processing a window
>> 60,61,62,63,64,65,66,67,68,69,70,71,72,73,74
>> processing a window
>> 75,76,77,78,79,80,81,82,83,84,85,86,87,88,89
>>
>> As you can see, the data from 90 to 101 is not processed. Shouldn't it be
>> processed when the window is completed after 5 ms?
>>
>> When I remove the trigger part, all of the data does get processed from 0
>> to 101.
>>
>> Any idea why do we see such a behaviour here?
>> --
>>
>>
>> *Regards,Harshvardhan Agrawal*
>>
>
>

-- 


*Regards,Harshvardhan Agrawal*

Mime
View raw message