beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jérémie Vexiau (JIRA) <j...@apache.org>
Subject [jira] [Updated] (BEAM-2803) JdbcIO read is very slow when query return a lot of rows
Date Fri, 25 Aug 2017 09:53:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-2803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Jérémie Vexiau updated BEAM-2803:
---------------------------------
    Attachment: test2M.jpg

> JdbcIO read is very slow when query return a lot of rows
> --------------------------------------------------------
>
>                 Key: BEAM-2803
>                 URL: https://issues.apache.org/jira/browse/BEAM-2803
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>    Affects Versions: Not applicable
>            Reporter: Jérémie Vexiau
>            Assignee: Reuven Lax
>              Labels: performance
>             Fix For: Not applicable
>
>         Attachments: test1500K.png, test1M.png, test2M.jpg, test2M.png, test500k.png
>
>
> Hi,
> I'm using JdbcIO reader in batch mode with the postgresql driver.
> my select query return more than 5 Millions rows
> using cursors with Statement.setFetchSize().
> these ParDo are OK :
> {code:java}
>           .apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
>           .apply(ParDo.of(new DoFn<T, KV<Integer, T>>() {
>             private Random random;
>             @Setup
>             public void setup() {
>               random = new Random();
>             }
>             @ProcessElement
>             public void processElement(ProcessContext context) {
>               context.output(KV.of(random.nextInt(), context.element()));
>             }
>           }))
> {code}
> but reshuffle is very very slow. 
> it must be the GroupByKey with more than 5 millions of Key.
> {code:java}
> .apply(GroupByKey.<Integer, T>create())
> {code}
> is there a way to optimize the reshuffle, or use another method to prevent fusion ? 
> thanks in advance,
> edit: 
> I add some tests 
> I use google dataflow as runner, with 1 worker, 2 max, and workerMachineType n1-standard-2
> and  autoscalingAlgorithm THROUGHPUT_BASED
> First one : query return 500 000 results : 
> !test500k.png|thumbnail!
> as we can see,
>  parDo(Read) is about 1300 r/s
> groupByKey is about 1080 r/s
> 2nd : query return 1 000 000 results 
> !test1M.png|thumbnail!
> parDo(read) => 1480 r/s
> groupByKey => 634 r/s
> 3rd : query return 1 500 000 results
> !test1500K.png|thumbnail!
> parDo(read) => 1700 r/s
> groupByKey => 565 r/s
> 4th query return 2 000 000 results
> !test2M.png|thumbnail!
> parDo(read) => 1485 r/s
> groupByKey => 537 r/s
> As we can see, groupByKey  rate decrease when number of record are more important.
> ps:  2nd worker start just after ParDo(read) is succeed



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message