flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Siew Wai Yow <wai_...@hotmail.com>
Subject Re: Flink application does not scale as expected, please help!
Date Sat, 16 Jun 2018 09:46:20 GMT
Hi, There is an interesting finding, the reason of low parallelism work much better is because
all task being run in same TM, once we scale more, the task is distributed to different TM
and the performance worse than the low parallelism case. Is this something expected? The more
I scale the less I get?

________________________________
From: Siew Wai Yow <wai_yow@hotmail.com>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!


Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record
per day and it will be increased so scalability is quite important to us to handle more data.
Unfortunately this is not work as expected even with only 10 millions of testing data. The
test application is just a simple jackson map + an empty process. CPU and memory is not an
issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX
peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind
to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127<http://do36.mycompany.com:8127>"

  *   aggrinterval: time in ms for timer to trigger
  *   loop: how many row of data to feed
  *   statsd: to send result to statsd
  *   psrc: source parallelism
  *   pJ2R: parallelism of map operator(JsonRecTranslator)
  *   pAggr: parallelism of process+timer operator(AggregationDuration)


Thank you!

Yow

________________________________
From: Jörn Franke <jornfranke@gmail.com>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!

How large is the input data? If the input data is very small then it does not make sense to
scale it even more. The larger the data is the more parallelism you will have. You can modify
this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <wai_yow@hotmail.com<mailto:wai_yow@hotmail.com>>
wrote:


Hi,


We found that our Flink application with simple logic, which using process function is not
scale-able when scale from 8 parallelism onward even though with sufficient resources. Below
it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators
it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism
32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json
record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used,
and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone
could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127<http://do36.mycompany.com:8127>"

  *   aggrinterval: time in ms for timer to trigger
  *   loop: how many row of data to feed
  *   statsd: to send result to statsd
  *   psrc: source parallelism
  *   pJ2R: parallelism of map operator(JsonRecTranslator)
  *   pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.

Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts
mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl
xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2
x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm
ida arat pln pts

total used free shared buff/cache available

Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


Thanks.


Regards,

Yow


<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>

Mime
View raw message