flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Count windows missing last elements?
Date Thu, 21 Apr 2016 12:54:33 GMT
People have wondered about that a few times, yes. My opinion is that a
stream is potentially infinite and processing only stops for anomalous
reasons: when the job crashes, when stopping a job to later redeploy it. In
those cases you would not want to flush out your data but keep them and
restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves
like the count trigger but also fires when receiving a Long.MAX_VALUE
watermark. A watermark of Long.MAX_VALUE signifies that a source has
stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkulagin@gmail.com> wrote:

> Thanks,
>
> I wonder wouldn't it be good to have a built-in such functionality. At
> least when incoming stream is finished - flush remaining elements.
>
> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> Hi,
>> yes, you can achieve this by writing a custom Trigger that can trigger
>> both on the count or after a long-enough timeout. It would be a combination
>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>> could look to those to get started.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkulagin@gmail.com> wrote:
>>
>>> I have a pretty big but final stream and I need to be able to window it
>>> by number of elements.
>>> In this case from my observations flink can 'skip' the latest chunk of
>>> data if it has lower amount of elements than window size:
>>>
>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>()
{
>>>
>>>       @Override
>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>       }
>>>
>>>       @Override
>>>       public void cancel() {
>>>
>>>       }
>>>     });
>>>
>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>()
{
>>>       @Override
>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long>
out) throws Exception {
>>>         System.out.println(Joiner.on(',').join(values));
>>>       }
>>>     }).print();
>>>
>>>     env.execute("yoyoyo");
>>>
>>>
>>> Output:
>>> 0,1,2,3,4,5,6,7,8,9
>>> 10,11,12,13,14,15,16,17,18,19
>>> 20,21,22,23,24,25,26,27,28,29
>>>
>>> I.e. elements from 10 to 35 are not being processed.
>>>
>>> Does it make sense to have: count OR timeout window which will evict new
>>> window when number of elements reach a threshold OR collecting timeout
>>> occurs?
>>>
>>
>

Mime
View raw message