flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jamie Grier <ja...@data-artisans.com>
Subject Re: Multiple consumers and custom triggers
Date Wed, 14 Dec 2016 17:52:09 GMT
For #1 there are a couple of ways to do this.  The easiest is probably
stream1.connect(stream2).map(...) where the MapFunction maps the two input
types to a common type that you can then process uniformly.

For #3 There must always be a WindowAssigner specified.  There are some
convenient ways to do this in the API such at timeWindow(), or
window(TumblingProcessingTimeWindows.of(...)), etc, however you always must
do this whether your provide your own trigger implementation or not.  The
way to use window(...) with and customer trigger is just:
 stream.keyBy(...).window(...).trigger(...).apply(...) or something
similar.  Not sure if I answered your question though..

For #4: If I understand you correctly that is exactly what CountWindow(10,
1) does already.  For example if your input data was a sequence of integers
starting with 0 the output would be:

(0)
(0, 1)
(0, 1, 2)
(0, 1, 2, 3)
(0, 1, 2, 3, 4)
(0, 1, 2, 3, 4, 5)
(0, 1, 2, 3, 4, 5, 6)
(0, 1, 2, 3, 4, 5, 6, 7)
(0, 1, 2, 3, 4, 5, 6, 7, 8)
(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
(4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
...
etc

-Jamie


On Wed, Dec 14, 2016 at 9:17 AM, Matt <dromitlabs@gmail.com> wrote:

> Hello people,
>
> I've written down some quick questions for which I couldn't find much or
> anything in the documentation. I hope you can answer some of them!
>
> *# Multiple consumers*
>
> *1.* Is it possible to .union() streams of different classes? It is
> useful to create a consumer that counts elements on different topics for
> example, using a key such as the class name of the element, and a tumbling
> window of 5 mins let's say.
>
> *2.* In case #1 is not possible, I need to launch multiple consumers to
> achieve the same effect. However, I'm getting a "Factory already
> initialized" error if I run environment.execute() for two consumers on
> different threads. How do you .execute() more than one consumer on the same
> application?
>
> *# Custom triggers*
>
> *3.* If a custom .trigger() overwrites the trigger of the WindowAssigner
> used previously, why do we have to specify a WindowAssigner (such as
> TumblingProcessingTimeWindows) in order to be able to specify a custom
> trigger? Shouldn't it be possible to send a trigger to .window()?
>
> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say) that
> may take more than 10 hours fill for the first time, but in the meanwhile I
> want to process whatever elements already generated. I guess the way to do
> this is to create a custom trigger that fires on every new element, with up
> to 10 elements at a time. The result would be windows of sizes: 1 element,
> then 2, 3, ..., 9, 10, 10, 10, .... Is there a way to achieve this with
> predefined triggers or a custom trigger is the only way to go here?
>
> Best regards,
> Matt
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com

Mime
View raw message