flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Flink Kafka more consumers than partitions
Date Wed, 03 Aug 2016 10:14:06 GMT
Do you use a keyBy() between the source and the window operator?

One think I can think of is the following:

  - With the higher source parallelism, you have more logical connections
(each source rebalances across all window operators).
  - with source parallelism 20, you have 20 * 160 = 3200 logical
connections (fewer physical TCP connections, because Flink multiplexes)
  - with source parallelism 40, you have 40 * 160 = 6400 logical
connections.

With more logical connections, you need more network buffers. While you
seem to have enough buffers to make the 6400 connections work, it may be
just a bit to little to balance out some short lived skew/latency effects
in the network.
With the 3200 connections, each connection can claim twice the number of
buffers, giving it more elasticity to balance out network latency effects.

I would try to double the number of network buffers for the case where the
source has a parallelism of 40, and see if that helps.

Greetings,
Stephan


On Wed, Aug 3, 2016 at 12:07 PM, Stephan Ewen <sewen@apache.org> wrote:

> Hi!
>
> Are you running on ProcessingTime or on EventTime?
>
> Thanks,
> Stephan
>
>
> On Wed, Aug 3, 2016 at 11:57 AM, neo21 zerro <neo21_zerro@yahoo.com>
> wrote:
>
>> Hi guys,
>>
>> Thanks for getting back to me.
>>
>> So to clarify:
>>     Topology wise flink kafka source (does avro deserialization and small
>> map) -> window operator which does batching for 3 seconds -> hbase sink
>>
>> Experiments:
>>
>> 1. flink source: parallelism 40 (20 idle tasks) -> window operator:
>> parallelism 160 -> hbase sink: parallelism 160
>>     - roughly 10.000 requests/sec on hbase
>> 2. flink source: parallelism 20 -> window operator: parallelism 160 ->
>> hbase sink: parallelism 160
>>     - roughly 100.000 requests/sec on hbase (10x more)
>>
>> @Stephan as described below the parallelism of the sink was kept the
>> same. I agree with you that there is nothing to backpressue on the source
>> ;) However, my understanding right now is that only backpressure can be the
>> explanation for this situation. Since we just change the source
>> parallelism, other things like hbase parallelism  are kept the same.
>>
>> @Sameer all of those things are valid points. We make sure that we reduce
>> the row locking by partitioning the data on the hbase sinks. We are just
>> after why this limitation is happening. And since the same setup is used
>> but just the source parallelism is changed I don't expect this to be a
>> hbase issue.
>>
>> Thanks guys!
>>
>>
>>
>> On Wednesday, August 3, 2016 11:38 AM, Sameer Wadkar <sameer@axiomine.com>
>> wrote:
>> What is the parallelism of the sink or the operator which writes to the
>> sinks in the first case. HBase puts are constrained by the following:
>> 1. How your regions are distributed. Are you pre-splitting your regions
>> for the table. Do you know the number of regions your Hbase tables are
>> split into.
>> 2. Are all the sinks writing to all the regions. Meaning are you getting
>> records in the sink operator which could potentially go to any region. This
>> can become a big bottleneck if you have 40 sinks talking to all regions. I
>> pre-split my regions based on key salting and use custom partitioning to
>> ensure each sink operator writes to only a few regions and my performance
>> shot up by several orders.
>> 3. You are probably doing this but adding puts in batches helps in
>> general but having each batch contain puts for too many regions hurts.
>>
>> If the source parallelism is the same as the parallelism of other
>> operators the 40 sinks communicating to all regions might be a problem.
>> When you go down to 20 sinks you actually might be getting better
>> performance due to lesser resource contention on HBase.
>>
>> Sent from my iPhone
>>
>>
>> > On Aug 3, 2016, at 4:14 AM, neo21 zerro <neo21_zerro@yahoo.com> wrote:
>> >
>> > Hello everybody,
>> >
>> > I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3
>> on YARN.
>> > In kafka I have a topic which have 20 partitions and my flink topology
>> reads from kafka (source) and writes to hbase (sink).
>> >
>> > when:
>> >     1. flink source has parallelism set to 40 (20 of the tasks are
>> idle) I see 10.000 requests/sec on hbase
>> >     2. flink source has parallelism set to 20 (exact number of
>> partitions) I see 100.0000 requests/sec on hbase (so a 10x improvement)
>> >
>> >
>> > It's clear that hbase is not the limiting factor in my topology.
>> > Assumption: Flink backpressure mechanism kicks in in the 1. case more
>> aggressively and it's limiting the ingestion of tuples in the topology.
>> >
>> > The question: In the first case, why are those 20 sources which are
>> sitting idle contributing so much to the backpressure?
>> >
>> >
>> > Thanks guys!
>>
>
>

Mime
View raw message