flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Konstantin Kulagin <kkula...@gmail.com>
Subject Re: Count windows missing last elements?
Date Fri, 22 Apr 2016 14:14:46 GMT
No problems at all, there is not much flink people and a lot of asking guys
- it should be hard to understand each person's issues :)


Yes, it is not as easy as 'contains' operator: I need to collect some
amount of tuples in order to create a in-memory lucene index. After that I
will filter entries basing on some predefined query.

So in a simplified case -
   -> for a window of tuples (preferably based on elements count)
   -> apply some operation to all elements in a window (create an index in
my case, but lets say strings concatenation would work as well, i.e any
operation that involves all window's tuples and produces some resulting
data would work)
  -> filter each of this window's elements basing on resulting data of this
all-window-elements operation
  -> emit filtered tuples

It might be a bit hard to understand. If it is - nevermind.



On Fri, Apr 22, 2016 at 9:27 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> I'm afraid I don't understand your use case yet. In you example you want
> to preserve only the elements where the string value contains a "3"? This
> can be done using a filter, as in
>
> source.filter( value -> value.f1.contains("3") )
>
> This is probably too easy, though, and I'm misunderstanding the problem.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 18:26 Kostya Kulagin <kkulagin@gmail.com> wrote:
>
>> Thanks for reply.
>>
>> Maybe I would need some advise in this case. My situation: we have a
>> stream of data, generally speaking <Long;String> tuples where long is a
>> unique key (ie there are no tuples with the same key)
>>
>> I need to filter out all tuples that do not match certain lucene query.
>>
>> Creating lucene index on one entry is too expensive and I cannot guess
>> what load in terms of number of entries per second would be. Idea was to
>> group entries by count, create index, filter and stream remaining tuples
>> for further processing.
>>
>> As a sample application if we replace lucene indexing with something like
>> String's 'contains' method source would look like this:
>>
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long,
String>>() {
>>   @Override
>>   public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception
{
>>     LongStream.range(0, 30).forEach(l -> {
>>       ctx.collect(Tuple2.of(l, "This is " + l));
>>
>>
>>     });
>>   }
>>
>>   @Override
>>   public void cancel() {
>>
>>   }
>> });
>>
>> And I need lets say to window tuples and preserve only those which
>> value.contains("3").
>> There are no grouping by key since basically all keys are different. I
>> might not know everything about flink yet but for this particular example -
>> does what you were saying make sense?
>>
>>
>> Thanks!
>> Kostya
>>
>>
>>
>>
>>
>>
>> On Thu, Apr 21, 2016 at 11:02 AM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>
>>> Hi,
>>> if you are doing the windows not for their actual semantics I would
>>> suggest not using count based windows and also not using the *All windows.
>>> The *All windows are all non-parallel, i.e. you always only get one
>>> parallel instance of your window operator even if you have a huge cluster.
>>>
>>> Also, in most cases it is better to not use a plain WindowFunction with
>>> apply because all elements have to be buffered so that they can be passed
>>> as an Iterable, Iterable<Long> in your example. If you can, I would suggest
>>> to use a ReduceFunction or FoldFunction or an apply() with an incremental
>>> aggregation function: apply(ReduceFunction, WindowFunction) or
>>> apply(FoldFunction, WindowFunction). These allow incremental aggregation of
>>> the result as elements arrive and don't require buffering of all elements
>>> until the window fires.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <kkulagin@gmail.com> wrote:
>>>
>>>> Maybe if it is not the first time it worth considering adding this
>>>> thing as an option? ;-)
>>>>
>>>> My usecase - I have a pretty big amount of data basically for ETL. It
>>>> is finite but it is big. I see it more as a stream not as a dataset. Also
I
>>>> would re-use the same code for infinite stream later...
>>>> And I do not much care about exact window size - it is just for
>>>> performance reasons I create a windows.
>>>>
>>>> Anyways - that you for the responses!
>>>>
>>>>
>>>> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <aljoscha@apache.org>
>>>> wrote:
>>>>
>>>>> People have wondered about that a few times, yes. My opinion is that
a
>>>>> stream is potentially infinite and processing only stops for anomalous
>>>>> reasons: when the job crashes, when stopping a job to later redeploy
it. In
>>>>> those cases you would not want to flush out your data but keep them and
>>>>> restart from the same state when the job is restarted.
>>>>>
>>>>> You can implement the behavior by writing a custom Trigger that
>>>>> behaves like the count trigger but also fires when receiving a
>>>>> Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that
a
>>>>> source has stopped processing for natural reasons.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkulagin@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> I wonder wouldn't it be good to have a built-in such functionality.
>>>>>> At least when incoming stream is finished - flush remaining elements.
>>>>>>
>>>>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <
>>>>>> aljoscha@apache.org> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> yes, you can achieve this by writing a custom Trigger that can
>>>>>>> trigger both on the count or after a long-enough timeout. It
would be a
>>>>>>> combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger)
>>>>>>> so you could look to those to get started.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkulagin@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I have a pretty big but final stream and I need to be able
to
>>>>>>>> window it by number of elements.
>>>>>>>> In this case from my observations flink can 'skip' the latest
chunk
>>>>>>>> of data if it has lower amount of elements than window size:
>>>>>>>>
>>>>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>     DataStreamSource<Long> source = env.addSource(new
SourceFunction<Long>() {
>>>>>>>>
>>>>>>>>       @Override
>>>>>>>>       public void run(SourceContext<Long> ctx) throws
Exception {
>>>>>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>>>>>       }
>>>>>>>>
>>>>>>>>       @Override
>>>>>>>>       public void cancel() {
>>>>>>>>
>>>>>>>>       }
>>>>>>>>     });
>>>>>>>>
>>>>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long,
Long, GlobalWindow>() {
>>>>>>>>       @Override
>>>>>>>>       public void apply(GlobalWindow window, Iterable<Long>
values, Collector<Long> out) throws Exception {
>>>>>>>>         System.out.println(Joiner.on(',').join(values));
>>>>>>>>       }
>>>>>>>>     }).print();
>>>>>>>>
>>>>>>>>     env.execute("yoyoyo");
>>>>>>>>
>>>>>>>>
>>>>>>>> Output:
>>>>>>>> 0,1,2,3,4,5,6,7,8,9
>>>>>>>> 10,11,12,13,14,15,16,17,18,19
>>>>>>>> 20,21,22,23,24,25,26,27,28,29
>>>>>>>>
>>>>>>>> I.e. elements from 10 to 35 are not being processed.
>>>>>>>>
>>>>>>>> Does it make sense to have: count OR timeout window which
will
>>>>>>>> evict new window when number of elements reach a threshold
OR collecting
>>>>>>>> timeout occurs?
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>

Mime
View raw message