flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostya Kulagin <kkula...@gmail.com>
Subject Re: Count windows missing last elements?
Date Thu, 21 Apr 2016 16:26:39 GMT
Thanks for reply.

Maybe I would need some advise in this case. My situation: we have a stream
of data, generally speaking <Long;String> tuples where long is a unique key
(ie there are no tuples with the same key)

I need to filter out all tuples that do not match certain lucene query.

Creating lucene index on one entry is too expensive and I cannot guess what
load in terms of number of entries per second would be. Idea was to group
entries by count, create index, filter and stream remaining tuples for
further processing.

As a sample application if we replace lucene indexing with something like
String's 'contains' method source would look like this:

StreamExecutionEnvironment env =
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new
SourceFunction<Tuple2<Long, String>>() {
  public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
    LongStream.range(0, 30).forEach(l -> {
      ctx.collect(Tuple2.of(l, "This is " + l));

  public void cancel() {


And I need lets say to window tuples and preserve only those which
There are no grouping by key since basically all keys are different. I
might not know everything about flink yet but for this particular example -
does what you were saying make sense?


On Thu, Apr 21, 2016 at 11:02 AM, Aljoscha Krettek <aljoscha@apache.org>

> Hi,
> if you are doing the windows not for their actual semantics I would
> suggest not using count based windows and also not using the *All windows.
> The *All windows are all non-parallel, i.e. you always only get one
> parallel instance of your window operator even if you have a huge cluster.
> Also, in most cases it is better to not use a plain WindowFunction with
> apply because all elements have to be buffered so that they can be passed
> as an Iterable, Iterable<Long> in your example. If you can, I would suggest
> to use a ReduceFunction or FoldFunction or an apply() with an incremental
> aggregation function: apply(ReduceFunction, WindowFunction) or
> apply(FoldFunction, WindowFunction). These allow incremental aggregation of
> the result as elements arrive and don't require buffering of all elements
> until the window fires.
> Cheers,
> Aljoscha
> On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <kkulagin@gmail.com> wrote:
>> Maybe if it is not the first time it worth considering adding this thing
>> as an option? ;-)
>> My usecase - I have a pretty big amount of data basically for ETL. It is
>> finite but it is big. I see it more as a stream not as a dataset. Also I
>> would re-use the same code for infinite stream later...
>> And I do not much care about exact window size - it is just for
>> performance reasons I create a windows.
>> Anyways - that you for the responses!
>> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>> People have wondered about that a few times, yes. My opinion is that a
>>> stream is potentially infinite and processing only stops for anomalous
>>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>>> those cases you would not want to flush out your data but keep them and
>>> restart from the same state when the job is restarted.
>>> You can implement the behavior by writing a custom Trigger that behaves
>>> like the count trigger but also fires when receiving a Long.MAX_VALUE
>>> watermark. A watermark of Long.MAX_VALUE signifies that a source has
>>> stopped processing for natural reasons.
>>> Cheers,
>>> Aljoscha
>>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkulagin@gmail.com> wrote:
>>>> Thanks,
>>>> I wonder wouldn't it be good to have a built-in such functionality. At
>>>> least when incoming stream is finished - flush remaining elements.
>>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljoscha@apache.org>
>>>> wrote:
>>>>> Hi,
>>>>> yes, you can achieve this by writing a custom Trigger that can trigger
>>>>> both on the count or after a long-enough timeout. It would be a combination
>>>>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>>>>> could look to those to get started.
>>>>> Cheers,
>>>>> Aljoscha
>>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkulagin@gmail.com>
>>>>> wrote:
>>>>>> I have a pretty big but final stream and I need to be able to window
>>>>>> it by number of elements.
>>>>>> In this case from my observations flink can 'skip' the latest chunk
>>>>>> of data if it has lower amount of elements than window size:
>>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>()
>>>>>>       @Override
>>>>>>       public void run(SourceContext<Long> ctx) throws Exception
>>>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>>>       }
>>>>>>       @Override
>>>>>>       public void cancel() {
>>>>>>       }
>>>>>>     });
>>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long,
Long, GlobalWindow>() {
>>>>>>       @Override
>>>>>>       public void apply(GlobalWindow window, Iterable<Long>
values, Collector<Long> out) throws Exception {
>>>>>>         System.out.println(Joiner.on(',').join(values));
>>>>>>       }
>>>>>>     }).print();
>>>>>>     env.execute("yoyoyo");
>>>>>> Output:
>>>>>> 0,1,2,3,4,5,6,7,8,9
>>>>>> 10,11,12,13,14,15,16,17,18,19
>>>>>> 20,21,22,23,24,25,26,27,28,29
>>>>>> I.e. elements from 10 to 35 are not being processed.
>>>>>> Does it make sense to have: count OR timeout window which will evict
>>>>>> new window when number of elements reach a threshold OR collecting
>>>>>> occurs?

View raw message