flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt <dromitl...@gmail.com>
Subject Re: Multiple consumers and custom triggers
Date Thu, 15 Dec 2016 00:02:08 GMT
Hey Jamie,

Ok with #1. I guess #2 is just not possible.

I got it about #3. I just checked the code for the tumbling window assigner
and I noticed it's just its default trigger that gets overwritten when
using a custom trigger, not the way it assigns windows, it makes sense now.

Regarding #4, after doing some more tests I think it's more complex than I
first thought. I'll probably create another thread explaining more that
specific question.

Thanks,
Matt

On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier <jamie@data-artisans.com>
wrote:

> For #1 there are a couple of ways to do this.  The easiest is probably
> stream1.connect(stream2).map(...) where the MapFunction maps the two
> input types to a common type that you can then process uniformly.
>
> For #3 There must always be a WindowAssigner specified.  There are some
> convenient ways to do this in the API such at timeWindow(), or window(
> TumblingProcessingTimeWindows.of(...)), etc, however you always must do
> this whether your provide your own trigger implementation or not.  The way
> to use window(...) with and customer trigger is just:
>  stream.keyBy(...).window(...).trigger(...).apply(...) or something
> similar.  Not sure if I answered your question though..
>
> For #4: If I understand you correctly that is exactly what CountWindow(10,
> 1) does already.  For example if your input data was a sequence of integers
> starting with 0 the output would be:
>
> (0)
> (0, 1)
> (0, 1, 2)
> (0, 1, 2, 3)
> (0, 1, 2, 3, 4)
> (0, 1, 2, 3, 4, 5)
> (0, 1, 2, 3, 4, 5, 6)
> (0, 1, 2, 3, 4, 5, 6, 7)
> (0, 1, 2, 3, 4, 5, 6, 7, 8)
> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
> ...
> etc
>
> -Jamie
>
>
> On Wed, Dec 14, 2016 at 9:17 AM, Matt <dromitlabs@gmail.com> wrote:
>
>> Hello people,
>>
>> I've written down some quick questions for which I couldn't find much or
>> anything in the documentation. I hope you can answer some of them!
>>
>> *# Multiple consumers*
>>
>> *1.* Is it possible to .union() streams of different classes? It is
>> useful to create a consumer that counts elements on different topics for
>> example, using a key such as the class name of the element, and a tumbling
>> window of 5 mins let's say.
>>
>> *2.* In case #1 is not possible, I need to launch multiple consumers to
>> achieve the same effect. However, I'm getting a "Factory already
>> initialized" error if I run environment.execute() for two consumers on
>> different threads. How do you .execute() more than one consumer on the same
>> application?
>>
>> *# Custom triggers*
>>
>> *3.* If a custom .trigger() overwrites the trigger of the WindowAssigner
>> used previously, why do we have to specify a WindowAssigner (such as
>> TumblingProcessingTimeWindows) in order to be able to specify a custom
>> trigger? Shouldn't it be possible to send a trigger to .window()?
>>
>> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say)
>> that may take more than 10 hours fill for the first time, but in the
>> meanwhile I want to process whatever elements already generated. I guess
>> the way to do this is to create a custom trigger that fires on every new
>> element, with up to 10 elements at a time. The result would be windows of
>> sizes: 1 element, then 2, 3, ..., 9, 10, 10, 10, .... Is there a way to
>> achieve this with predefined triggers or a custom trigger is the only way
>> to go here?
>>
>> Best regards,
>> Matt
>>
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com
>
>

Mime
View raw message