flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Nowojski <pi...@ververica.com>
Subject Re: Backpressure tuning/failure
Date Thu, 10 Oct 2019 14:23:19 GMT
Hi Owen,

Thanks for the quick response. No, I haven’t seen the previous blog post, yes it clears
the things out a bit. 

> To clarify, the code is attempting to simulate a straggler node due to high load, which
therefore processes data at a slower rate - not a failing node. Some degree of this is a feature
of multi-tenant Hadoop. 

In your benchmark you are manually slowing down just one TaskManager, so you are testing for
the failing/slow machine case, where either:
	1. the machine is slow on it’s own because it’s smaller than the others,
	2. it’s overloaded by some service independent of the Flink 
	3. it's a failing node. 

Out of those three options, first two are not supported by Flink, in a sense that Flink assumes
more or less equal machines in the cluster. The third is, as I wrote in the previous response,
pretty uncommon scenario (until you reach really huge scale). How often one of your machine
fails in a way that it is 6.6 times slower than the others? I agree Flink doesn’t handle
this automatically at the moment (currently you would be expected to manually shut down the
machine). Nevertheless there are some plans how to address this (speculative execution and
load based balancing channel selection), but with no definite schedule.

Also if the issue is "multi-tenant Hadoop.”, I would first try to better assign resources
in the cluster, using for example CGroups via yarn/lxc/docker, or virtual machines.

Cheers, Piotrek

> On 10 Oct 2019, at 16:02, Owen Rees-Hayward <owenrh@googlemail.com> wrote:
> Hi Piotr,
> Thanks for getting back to me and for the info. I try to describe the motivation around
the scenarios in the original post in the series - see the 'Backpressure - why you might care'
section on http://owenrh.me.uk/blog/2019/09/30/ <http://owenrh.me.uk/blog/2019/09/30/>.
Maybe it could have been clearer.
> As you note, this will not affect every Flink job. However, one persons niche is another
persons day job. I definitely agree that keyed network exchanges, which is going to the majority
of analytics queries, are in a different problem space. However, this is not an uncommon scenario
in ingest pipelines.
> I'd be interested to know whether you saw the section in the post I referred to above
and whether this clears anything up? To clarify, the code is attempting to simulate a straggler
node due to high load, which therefore processes data at a slower rate - not a failing node.
Some degree of this is a feature of multi-tenant Hadoop. 
> Cheers, Owen
> On Thu, 10 Oct 2019 at 10:27, Piotr Nowojski <piotr@ververica.com <mailto:piotr@ververica.com>>
> Hi,
> I’m not entirely sure what you are testing. I have looked at your code (only the constant
straggler scenario) and please correct me if’m wrong, in your job you are basically measuring
throughput of `Thread.sleep(straggler.waitMillis)`.
> In the first RichMap task (`subTaskId == 0`), per every record you do the sleep(50ms),
so after filling in all of the network buffers  your whole job will be bottlenecked by this
throughput cap of 20 records / second. Every so often when this struggling task will be able
to process and free up some buffer from the backlog. This briefly unblocks other three tasks
(which are capped at 133 records / second). Apart from those short stints, those other tasks
can not process constant 133 records / seconds, because records are evenly distributed by
the source between all of those tasks. Which is I think clearly visible on the charts and
every system would behave in exactly the same way.
> But what scenario are you really trying to simulate? 
> A data skew when one task is 6.65 (133 / 20 ) times more overloaded/processing heavier
records than the others? Yes, this is expected behaviour, but your benchmark is testing this
in a bit convoluted way.
> A failing machine which has 6.65 times less performance? With keyed network exchanges
there is again very little that you can do (except of the speculative execution). Without
keyed network exchanges, OK, I agree. In this case, randomly/evenly distributing the records
is not the optimal shuffling strategy and there is some room for the improvement in Flink
(we could distribute records not randomly but to the less busy machines). However this is
a pretty much niche feature (failing machine + non keyed exchanges) and you are not saying
anywhere that this is what you are testing for.
> Piotrek
>> On 8 Oct 2019, at 18:10, Owen Rees-Hayward <owenrh@googlemail.com <mailto:owenrh@googlemail.com>>
>> Hi,
>> I am having a few issues with the Flink (v1.8.1) backpressure default settings, which
lead to poor throughput in a comparison I am doing between Storm, Spark and Flink.
>> I have a setup that simulates a progressively worse straggling task that Storm and
Spark cope with the relatively well. Flink not so much. Code can be found here - https://github.com/owenrh/flink-variance
>> See this throughput chart for the an idea of how badly - https://owenrh.me.uk/assets/images/blog/smackdown/flink-constant-straggler.png
>> I do not have any production experience with Flink, but I have had a look at the
Flink docs and there is nothing in there that jumps out at me to explain or address this.
I presume I am missing something, as I cannot believe Flink is this weak in the face of stragglers.
It must be configuration right?
>> Would appreciate any help on this. I've got a draft blog post that I will publish
in a day or two, and don't want to criticise the Flink backpressure implementation for what
seems most likely some default configuration issue.
>> Thanks in advance, Owen
>> -- 
>> Owen Rees-Hayward
>> 07912 876046
>> twitter.com/owen4d <http://twitter.com/owen4d>
> -- 
> Owen Rees-Hayward
> 07912 876046
> twitter.com/owen4d <http://twitter.com/owen4d>

View raw message