flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jamie Grier <ja...@data-artisans.com>
Subject Re: Multiple consumers and custom triggers
Date Thu, 15 Dec 2016 23:21:41 GMT
All streams can be parallelized in Flink even with only one source.  You
can have multiple sinks as well.

On Thu, Dec 15, 2016 at 7:56 AM, Meghashyam Sandeep V <
vr1meghashyam@gmail.com> wrote:

> 1. If we have multiple sources, can the streams be parallelized ?
> 2. Can we have multiple sinks as well?
>
> On Dec 14, 2016 10:46 PM, <dromitlabs@gmail.com> wrote:
>
>> Got it. Thanks!
>>
>> On Dec 15, 2016, at 02:58, Jamie Grier <jamie@data-artisans.com> wrote:
>>
>> Ahh, sorry, for #2: A single Flink job can have as many sources as you
>> like. They can be combined in multiple ways, via things like joins, or
>> connect(), etc. They can also be completely independent — in other words
>> the data flow graph can be completely disjoint. You never to need to call
>> execute() more than once. Just define you program, with as many sources as
>> you want, and then call execute().
>>
>> val stream1 = env.addSource(...)val stream2 = env.addSource(...)
>>
>> stream1
>>   .map(...)
>>   .addSink(...)
>>
>> stream2
>>   .map(...)
>>   .addSink(...)
>>
>> env.execute() // this is all you need
>>
>> ​
>>
>> On Wed, Dec 14, 2016 at 4:02 PM, Matt <dromitlabs@gmail.com> wrote:
>>
>>> Hey Jamie,
>>>
>>> Ok with #1. I guess #2 is just not possible.
>>>
>>> I got it about #3. I just checked the code for the tumbling window
>>> assigner and I noticed it's just its default trigger that gets overwritten
>>> when using a custom trigger, not the way it assigns windows, it makes sense
>>> now.
>>>
>>> Regarding #4, after doing some more tests I think it's more complex than
>>> I first thought. I'll probably create another thread explaining more that
>>> specific question.
>>>
>>> Thanks,
>>> Matt
>>>
>>> On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier <jamie@data-artisans.com>
>>> wrote:
>>>
>>>> For #1 there are a couple of ways to do this.  The easiest is probably
>>>> stream1.connect(stream2).map(...) where the MapFunction maps the two
>>>> input types to a common type that you can then process uniformly.
>>>>
>>>> For #3 There must always be a WindowAssigner specified.  There are some
>>>> convenient ways to do this in the API such at timeWindow(), or
>>>> window(TumblingProcessingTimeWindows.of(...)), etc, however you always
>>>> must do this whether your provide your own trigger implementation or not.
>>>> The way to use window(...) with and customer trigger is just:
>>>>  stream.keyBy(...).window(...).trigger(...).apply(...) or something
>>>> similar.  Not sure if I answered your question though..
>>>>
>>>> For #4: If I understand you correctly that is exactly what
>>>> CountWindow(10, 1) does already.  For example if your input data was a
>>>> sequence of integers starting with 0 the output would be:
>>>>
>>>> (0)
>>>> (0, 1)
>>>> (0, 1, 2)
>>>> (0, 1, 2, 3)
>>>> (0, 1, 2, 3, 4)
>>>> (0, 1, 2, 3, 4, 5)
>>>> (0, 1, 2, 3, 4, 5, 6)
>>>> (0, 1, 2, 3, 4, 5, 6, 7)
>>>> (0, 1, 2, 3, 4, 5, 6, 7, 8)
>>>> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
>>>> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>>> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
>>>> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
>>>> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
>>>> ...
>>>> etc
>>>>
>>>> -Jamie
>>>>
>>>>
>>>> On Wed, Dec 14, 2016 at 9:17 AM, Matt <dromitlabs@gmail.com> wrote:
>>>>
>>>>> Hello people,
>>>>>
>>>>> I've written down some quick questions for which I couldn't find much
>>>>> or anything in the documentation. I hope you can answer some of them!
>>>>>
>>>>> *# Multiple consumers*
>>>>>
>>>>> *1.* Is it possible to .union() streams of different classes? It is
>>>>> useful to create a consumer that counts elements on different topics
for
>>>>> example, using a key such as the class name of the element, and a tumbling
>>>>> window of 5 mins let's say.
>>>>>
>>>>> *2.* In case #1 is not possible, I need to launch multiple consumers
>>>>> to achieve the same effect. However, I'm getting a "Factory already
>>>>> initialized" error if I run environment.execute() for two consumers on
>>>>> different threads. How do you .execute() more than one consumer on the
same
>>>>> application?
>>>>>
>>>>> *# Custom triggers*
>>>>>
>>>>> *3.* If a custom .trigger() overwrites the trigger of the
>>>>> WindowAssigner used previously, why do we have to specify a WindowAssigner
>>>>> (such as TumblingProcessingTimeWindows) in order to be able to specify
a
>>>>> custom trigger? Shouldn't it be possible to send a trigger to .window()?
>>>>>
>>>>> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say)
>>>>> that may take more than 10 hours fill for the first time, but in the
>>>>> meanwhile I want to process whatever elements already generated. I guess
>>>>> the way to do this is to create a custom trigger that fires on every
new
>>>>> element, with up to 10 elements at a time. The result would be windows
of
>>>>> sizes: 1 element, then 2, 3, ..., 9, 10, 10, 10, .... Is there a way
to
>>>>> achieve this with predefined triggers or a custom trigger is the only
way
>>>>> to go here?
>>>>>
>>>>> Best regards,
>>>>> Matt
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Jamie Grier
>>>> data Artisans, Director of Applications Engineering
>>>> @jamiegrier <https://twitter.com/jamiegrier>
>>>> jamie@data-artisans.com
>>>>
>>>>
>>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> jamie@data-artisans.com
>>
>>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com

Mime
View raw message