flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Count windows missing last elements?
Date Thu, 21 Apr 2016 15:02:56 GMT
Hi,
if you are doing the windows not for their actual semantics I would suggest
not using count based windows and also not using the *All windows. The *All
windows are all non-parallel, i.e. you always only get one parallel
instance of your window operator even if you have a huge cluster.

Also, in most cases it is better to not use a plain WindowFunction with
apply because all elements have to be buffered so that they can be passed
as an Iterable, Iterable<Long> in your example. If you can, I would suggest
to use a ReduceFunction or FoldFunction or an apply() with an incremental
aggregation function: apply(ReduceFunction, WindowFunction) or
apply(FoldFunction, WindowFunction). These allow incremental aggregation of
the result as elements arrive and don't require buffering of all elements
until the window fires.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <kkulagin@gmail.com> wrote:

> Maybe if it is not the first time it worth considering adding this thing
> as an option? ;-)
>
> My usecase - I have a pretty big amount of data basically for ETL. It is
> finite but it is big. I see it more as a stream not as a dataset. Also I
> would re-use the same code for infinite stream later...
> And I do not much care about exact window size - it is just for
> performance reasons I create a windows.
>
> Anyways - that you for the responses!
>
>
> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> People have wondered about that a few times, yes. My opinion is that a
>> stream is potentially infinite and processing only stops for anomalous
>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>> those cases you would not want to flush out your data but keep them and
>> restart from the same state when the job is restarted.
>>
>> You can implement the behavior by writing a custom Trigger that behaves
>> like the count trigger but also fires when receiving a Long.MAX_VALUE
>> watermark. A watermark of Long.MAX_VALUE signifies that a source has
>> stopped processing for natural reasons.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkulagin@gmail.com> wrote:
>>
>>> Thanks,
>>>
>>> I wonder wouldn't it be good to have a built-in such functionality. At
>>> least when incoming stream is finished - flush remaining elements.
>>>
>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljoscha@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> yes, you can achieve this by writing a custom Trigger that can trigger
>>>> both on the count or after a long-enough timeout. It would be a combination
>>>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>>>> could look to those to get started.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkulagin@gmail.com> wrote:
>>>>
>>>>> I have a pretty big but final stream and I need to be able to window
>>>>> it by number of elements.
>>>>> In this case from my observations flink can 'skip' the latest chunk of
>>>>> data if it has lower amount of elements than window size:
>>>>>
>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>()
{
>>>>>
>>>>>       @Override
>>>>>       public void run(SourceContext<Long> ctx) throws Exception
{
>>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>>       }
>>>>>
>>>>>       @Override
>>>>>       public void cancel() {
>>>>>
>>>>>       }
>>>>>     });
>>>>>
>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long,
GlobalWindow>() {
>>>>>       @Override
>>>>>       public void apply(GlobalWindow window, Iterable<Long> values,
Collector<Long> out) throws Exception {
>>>>>         System.out.println(Joiner.on(',').join(values));
>>>>>       }
>>>>>     }).print();
>>>>>
>>>>>     env.execute("yoyoyo");
>>>>>
>>>>>
>>>>> Output:
>>>>> 0,1,2,3,4,5,6,7,8,9
>>>>> 10,11,12,13,14,15,16,17,18,19
>>>>> 20,21,22,23,24,25,26,27,28,29
>>>>>
>>>>> I.e. elements from 10 to 35 are not being processed.
>>>>>
>>>>> Does it make sense to have: count OR timeout window which will evict
>>>>> new window when number of elements reach a threshold OR collecting timeout
>>>>> occurs?
>>>>>
>>>>
>>>
>

Mime
View raw message