flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Urs Schoenenberger <urs.schoenenber...@tngtech.com>
Subject Re: stream partitioning to avoid network overhead
Date Fri, 11 Aug 2017 13:55:43 GMT
Hi Karthik,

maybe I'm misunderstanding, but there are a few things in your
description that seem strange to me:

- Your "slow" operator seems to be slow not because it's compute-heavy,
but because it's waiting for a response. Is AsyncIO (
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
) an option for you? It's a more natural approach to this issue compared
to raising the parallelism.

- A flink task slot can hold an instance of each operator. If you have
two (non-chained) operators, one with parallelism 1, one with
parallelism 2, you will use two slots, not three. You are not wasting
slots by having source parallelism 3 instead of 1.

- Therefore, it would in my opinion make sense to run all your operators
at the same parallelism. Operator chaining is desirable! Just run at the
max parallelism (the one you'd give to the webservice operator) and make
sure your Kafka topic has enough partitions to serve this parallelism.
If you're going the Async I/O way, you probably don't even need high
parallelism, but even with the sync implementation I can't think of a
big problem with running the chained operator at max parallelism.

- Minor point: I guess you have measured this, but I'm a little confused
why the intra-Flink network traffic would cause significant problems: It
seems to be the same amount of data that you're querying from the
external webservice, and that connection should be a bottleneck before
the intra-cluster network becomes one?


Best regards,
Urs

On 11.08.2017 04:54, Karthik Deivasigamani wrote:
> Hi,
> 
>    I have a use case where we read messages from a Kafka topic and invoke a
> webservice. The web-service call can take a take couple of seconds and then
> gives us back on avg 800KB of data. This data is set to another operator
> which does the parsing and then it gets sent to sink which saves the
> processed data in a NoSQL db. The app looks like this :
> 
> [image: Inline image 1]
> Since my payload from the web service is large a lot of data is transferred
> over the network and this is becoming a bottle neck.
> 
> Lets say *I have 6 slots per node and I would like to have 1 slot for
> source, 3 slots for web service calls, 2 for parser and 1 for my sink*.
> This way all the processing can happen locally and there is no network
> overhead. I have tried *stream.forward() *but it requires that the down
> stream operator has the same number of parallelism as the one emitting
> data. Next I tried *stream.rescale()* and that does not schedule the task
> as I would expect it given the parallelism's on the operators are all
> multiple of each other (my flink cluster has enough empty slots and
> capacity).
> 
> 
> [image: Inline image 2]
> 
> 
> Is there a way to schedule my task's in a fashion where there is no data
> transfer over the network. I was able to do this in apache storm by using
> localOrShuffle grouping. Not sure how to acehive the same in flink. Any
> pointers would be really helpful.
> 
> For now I have solved this problem by having the same parallelism on the
> web-service operator, parser, sink which causes flink to chain these
> operator together and execute them in the same thread.But ideally I would
> like to have more instances of the slow operator and less instances of my
> fast operator.
> 
> ~
> Karthik
> 
> 
> 
> Hi,
> 
>    I have a use case where we read messages from a Kafka topic and
> invoke a webservice. The web-service call can take a take couple of
> seconds and then gives us back on avg 800KB of data. This data is set to
> another operator which does the parsing and then it gets sent to sink
> which saves the processed data in a NoSQL db. The app looks like this :
> 
> Inline image 1
> Since my payload from the web service is large a lot of data is
> transferred over the network and this is becoming a bottle neck.
> 
> Lets say *I have 6 slots per node and I would like to have 1 slot for
> source, 3 slots for web service calls, 2 for parser and 1 for my sink*.
> This way all the processing can happen locally and there is no network
> overhead. I have tried *stream.forward() *but it requires that the down
> stream operator has the same number of parallelism as the one emitting
> data. Next I tried *stream.rescale()* and that does not schedule the
> task as I would expect it given the parallelism's on the operators are
> all multiple of each other (my flink cluster has enough empty slots and
> capacity).
> 
> 
> Inline image 2
> 
> 
> Is there a way to schedule my task's in a fashion where there is no data
> transfer over the network. I was able to do this in apache storm by
> using localOrShuffle grouping. Not sure how to acehive the same in
> flink. Any pointers would be really helpful.
> 
> For now I have solved this problem by having the same parallelism on the
> web-service operator, parser, sink which causes flink to chain these
> operator together and execute them in the same thread.But ideally I
> would like to have more instances of the slow operator and less
> instances of my fast operator.
> 
> ~
> Karthik

-- 
Urs Schönenberger - urs.schoenenberger@tngtech.com - +49 174 9955 692

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Mime
View raw message