flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: emit a single Map<String, T> per window
Date Mon, 05 Sep 2016 11:30:39 GMT
Hi,
for this you would have to use a non-parallel window, i.e. something like
stream.windowAll(<my window>).apply(...). This does not compute per key but
has the drawback that computation does not happen in parallel. If you only
use it to combine the pre-aggregated maps it could be OK, though.

Cheers,
Aljoscha

On Fri, 2 Sep 2016 at 18:26 Luis Mariano Guerra <mariano@event-fabric.com>
wrote:

> On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> Hi,
>> from this I would expect to get as many HashMaps as you have keys. The
>> winFunction is also executed per-key so it cannot combine the HashMaps of
>> all keys.
>>
>> Does this describe the behavior that you're seeing?
>>
>
> yes, it's the behaviour I'm seeing, I'm looking for a way to merge those
> HashMaps from the same window into a single one, I can't find how.
>
>
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <mariano@event-fabric.com>
>> wrote:
>>
>>> hi!
>>>
>>> I'm trying to collect some metrics by key per window and emiting the
>>> full result at the end of the window to kafka, I started with a simple
>>> count by key to test it but my requirements are a little more complex than
>>> that.
>>>
>>> what I want to do is to fold the stream events as they come and then at
>>> the end of the window merge them together and emit a singe result, I don't
>>> want to accumulate all the events and calculate at the end of the window,
>>> from my understanding of fold in other languages/libraries, this would be
>>> what I need, using it via apply(stateIn, foldFun, windowFun) but it's not
>>> working:
>>>
>>> the basic is:
>>>
>>>     input
>>>                 .flatMap(new LineSplitter())
>>>                 .keyBy(0)
>>>                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>>>                 .apply(new HashMap<String, Integer>(), foldFunction,
>>> winFunction);
>>>
>>> where foldFunction accumulates by key and winFunction iterate over the
>>> hasmaps and merges them into a single result hashmap and emits that one at
>>> the end.
>>>
>>> this emits many one-key hash maps instead of only one with all the keys,
>>> I tried setting setParallelism(1) in multiple places but still doesn't
>>> work. More confusingly, in one run it emited a single map but after I ran
>>> it again it went back to the previous behavior.
>>>
>>> what I'm doing wrong? is there any other approach?
>>>
>>> I can provide the implementation of foldFunction and winFunction if
>>> required but I think it doesn't change much.
>>>
>>> Reading the source code I see:
>>>
>>>     Applies the given window function to each window. The window
>>> function is called for each evaluation of the window for each key
>>> individually. The output of the window function is interpreted as a regular
>>> non-windowed stream.
>>>
>>> emphasis on " for each key individually", the return type of apply is
>>> SingleOutputStreamOperator which doesn't provide many operations to group
>>> the emited values.
>>>
>>> thanks in advance.
>>>
>>

Mime
View raw message