flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sihua Zhou (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
Date Wed, 13 Jun 2018 09:17:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510828#comment-16510828
] 

Sihua Zhou commented on FLINK-9506:
-----------------------------------

Hi [~yow], I didn't see the email you sent yet, but I just had a look at your code, I think
the "non-scale-able"  might be caused by your test code. From your code we could see that
the source's parallelism is always the same as the other operators. And in the each sub-task
of the source, you use the loop to mock the source records, that means the QPS of the source
will increase when you trying to rescale up the parallelism of your job, in the end, you didn't
scale up anything indeed. I would suggest to set the parallelism of the source to a fixed
value(e.g. 4), and scale up the job, then let's see whether it's scalable. I didn't test your
code on cluster yet, will test it later. My email is "summerleafs@163.com", if you had problem
to send email to "user@flink.apache.org", you could send to me personally if you want. Thanks~

> Flink ReducingState.add causing more than 100% performance drop
> ---------------------------------------------------------------
>
>                 Key: FLINK-9506
>                 URL: https://issues.apache.org/jira/browse/FLINK-9506
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.4.2
>            Reporter: swy
>            Priority: Major
>         Attachments: KeyNoHash_VS_KeyHash.png, flink.png, input_stop_when_timer_run.png,
keyby.png
>
>
> Hi, we found out application performance drop more than 100% when ReducingState.add is
used in the source code. In the test checkpoint is disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just simply keep
storing record, also with simple reduction function(in fact with empty function would see
the same result). Any idea would be appreciated. What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how many record
per second in "JsonTranslator", which is shown in the graph. The difference between is just
1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream<String> stream = env.addSource(new GeneratorSource(loop);
> DataStream<JSONObject> convert = stream.map(new JsonTranslator())
>                                        .keyBy()
>                                        .process(new ProcessAggregation())
>                                        .map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
>     private ReducingState<Record> recStore;
>     public void processElement(Recordr, Context ctx, Collector<Record> out) {
>         recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message