flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shailesh Jain <shailesh.j...@stellapps.com>
Subject Re: Correlation between data streams/operators and threads
Date Tue, 21 Nov 2017 08:42:56 GMT
Understood. Thanks a lot!

I'll try out the keyBy approach first.

Shailesh


On Tue, Nov 21, 2017 at 1:53 PM, Piotr Nowojski <piotr@data-artisans.com>
wrote:

> So as long as the parallelism of my kafka source and sink operators is 1,
> all the subsequent operators (multiple filters to create multiple streams,
> and then individual CEP and Process operators per stream) will be executed
> in the same task slot?
>
>
> Yes, unless you specify different resource sharing group for subsequent
> operators.
>
> Regarding approach D, I'm not sure how this is different from the current
> approach I had provided the code for above, and will it solve this problem
> of different data streams not getting distributed across slots?
>
>
> Difference is huge. Without keyBy you can not have multiple instances
> (parallelism > 1) of source and filtering operators (unless you create
> different Kafka partitions per each device, which in your case would solve
> a lot of problems btw). Your solution that you shown earlier, will simply
> not scale beyond one machine. You could distribute your business logic
> among as many machines as you want, but there always would be a potential
> bottleneck of single source/filtering operations. With keyBy you could have
> multiple source operators and keyBy would ensure that events from the same
> device are processed always by one task/machine.
>
> Piotrek
>
> On 21 Nov 2017, at 07:39, Shailesh Jain <shailesh.jain@stellapps.com>
> wrote:
>
> Thanks for your time in helping me here.
>
> So as long as the parallelism of my kafka source and sink operators is 1,
> all the subsequent operators (multiple filters to create multiple streams,
> and then individual CEP and Process operators per stream) will be executed
> in the same task slot?
>
> I cannot take approach F as the entire business logic revolves around
> event timing.
>
> Regarding approach D, I'm not sure how this is different from the current
> approach I had provided the code for above, and will it solve this problem
> of different data streams not getting distributed across slots?
>
> Thanks again,
> Shailesh
>
> On Fri, Nov 17, 2017 at 3:01 PM, Piotr Nowojski <piotr@data-artisans.com>
> wrote:
>
>> Sorry for not responding but I was away.
>>
>> Regarding 1.
>>
>> One source operator, followed by multiple tasks with parallelism 1 (as
>> visible on your screen shot) that share resource group will collapse to one
>> task slot - only one TaskManager will execute all of your job.
>>
>>
>> Because all of your events are written into one Kafka topic, previously
>> proposed solutions A) (multiple jobs), and B) (one job with multiple
>> sources) can not work. In that case what you have to do is either:
>>
>> D) set parallelism as you wish in the environment, read from Kafka, keyBy
>> device type, split the stream by filtering by device type (or using side
>> outputs), perform your logic
>>
>> This will create TOTAL_DEVICES number of data streams after keyBy on each
>> machine, and filtering will cost you (it will be linear according
>> TOTAL_DEVICES), but should be the easiest solution.
>>
>> E) set parallelism as you wish, read from Kafka, keyBy device type, write
>> custom operators with custom logic handling watermarks using KeyedState
>>
>> However I would strongly suggest to re-consider
>>
>> F) ignore all the issue of assigning different watermarks per device
>> stream, just assign minimal from all of the devices. It would be the
>> easiest to implement.
>>
>> Piotrek
>>
>> > On 17 Nov 2017, at 09:22, Nico Kruber <nico@data-artisans.com> wrote:
>> >
>> > regarding 3.
>> > a) The taskmanager logs are missing, are there any?
>> > b) Also, the JobManager logs say you have 4 slots available in total -
>> is this
>> > enough for your 5 devices scenario?
>> > c) The JobManager log, however, does not really reveal what it is
>> currently
>> > doing, can you set the log level to DEBUG to see more?
>> > d) Also, do you still observe CPU load during the 15min as an
>> indication that
>> > it is actually doing something?
>> > e) During this 15min period where apparently nothing happens, can you
>> provide
>> > the output of "jstack <jobmanager_pid>" (with the PID of your
>> JobManager)?
>> > f) You may further be able to debug into what is happening by running
>> this in
>> > your IDE in debug mode and pause the execution when you suspect it to
>> hang.
>> >
>> >
>> > Nico
>> >
>> > On Tuesday, 14 November 2017 14:27:36 CET Piotr Nowojski wrote:
>> >> 3. Nico, can you take a look at this one? Isn’t this a blob server
>> issue?
>> >>
>> >> Piotrek
>> >>
>> >>> On 14 Nov 2017, at 11:35, Shailesh Jain <shailesh.jain@stellapps.com>
>> >>> wrote:
>> >>>
>> >>> 3. Have attached the logs and exception raised (15min - configured
>> akka
>> >>> timeout) after submitting the job.
>> >>>
>> >>> Thanks,
>> >>> Shailesh
>> >>>
>> >>>
>> >>> On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <
>> piotr@data-artisans.com
>> >>> <mailto:piotr@data-artisans.com>> wrote: Hi,
>> >>>
>> >>> 3. Can you show the logs from job manager and task manager?
>> >>>
>> >>>> On 14 Nov 2017, at 07:26, Shailesh Jain <shailesh.jain@stellapps.com
>> >>>> <mailto:shailesh.jain@stellapps.com>> wrote:
>> >>>>
>> >>>> Hi Piotrek,
>> >>>>
>> >>>> I tried out option 'a' mentioned above, but instead of separate
>> jobs, I'm
>> >>>> creating separate streams per device. Following is the test
>> deployment
>> >>>> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu
>> machine):
>> >>>>
>> >>>> akka.client.timeout 15 min
>> >>>> jobmanager.heap.mb 1024
>> >>>> jobmanager.rpc.address localhost
>> >>>> jobmanager.rpc.port 6123
>> >>>> jobmanager.web.port 8081
>> >>>> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
>> >>>> metrics.reporter.jmx.port 8789
>> >>>> metrics.reporters jmx
>> >>>> parallelism.default 1
>> >>>> taskmanager.heap.mb 1024
>> >>>> taskmanager.memory.preallocate false
>> >>>> taskmanager.numberOfTaskSlots 4
>> >>>>
>> >>>> The number of Operators per device stream is 4 (one sink function,
3
>> CEP
>> >>>> operators).
>> >>>>
>> >>>> Observations (and questions):
>> >>>>
>> >>>> 3. Job deployment hangs (never switches to RUNNING) when the number
>> of
>> >>>> devices is greater than 5. Even on increasing the akka client
>> timeout,
>> >>>> it does not help. Will separate jobs being deployed per device
>> instead
>> >>>> of separate streams help here?
>> >>>>
>> >>>> Thanks,
>> >>>> Shailesh
>>
>>
>
>

Mime
View raw message