Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5EEA91844C for ; Fri, 22 Jan 2016 13:35:04 +0000 (UTC) Received: (qmail 10846 invoked by uid 500); 22 Jan 2016 13:35:04 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 10764 invoked by uid 500); 22 Jan 2016 13:35:04 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 10755 invoked by uid 99); 22 Jan 2016 13:35:04 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jan 2016 13:35:04 +0000 Received: from mail-lb0-f175.google.com (mail-lb0-f175.google.com [209.85.217.175]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 1FB0B1A0040 for ; Fri, 22 Jan 2016 13:35:03 +0000 (UTC) Received: by mail-lb0-f175.google.com with SMTP id bc4so41119959lbc.2 for ; Fri, 22 Jan 2016 05:35:02 -0800 (PST) X-Gm-Message-State: AG10YORK+8rv1zkBxT1z+uHjEUqE/mUcY/1kVajMfUogBcgSLFYZM3XTZLuvZqbC7+2FWWZG8VfVLoMT/o96cw== X-Received: by 10.112.14.39 with SMTP id m7mr1270559lbc.20.1453469701684; Fri, 22 Jan 2016 05:35:01 -0800 (PST) MIME-Version: 1.0 Received: by 10.112.190.67 with HTTP; Fri, 22 Jan 2016 05:34:42 -0800 (PST) In-Reply-To: References: From: Robert Metzger Date: Fri, 22 Jan 2016 14:34:42 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Backpressure in the context of JDBCOutputFormat update To: "user@flink.apache.org" Content-Type: multipart/alternative; boundary=001a11c3726a42cab10529ec49dd --001a11c3726a42cab10529ec49dd Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi, have you thought about making two independent jobs out of this? (or you call execute() for the two separate parts) One job for the update() and one for the insert() ? Even though the update operation should not be expensive, I think its helpful to understand the performance impact of having concurrent insert / updates vs executing these operations sequentially ? Are the inserts / updates performed on the same table? On Thu, Jan 21, 2016 at 4:17 PM, Maximilian Bode < maximilian.bode@tngtech.com> wrote: > Hi Robert, > sorry, I should have been clearer in my initial mail. The two cases I was > comparing are: > > 1) distinct() before Insert (which is necessary as we have a unique key > constraint in our database), no distinct() before update > 2) distinct() before insert AND distinct() before update > > The test data used actually only contains unique values for the affected > field though, so the dataset size is not reduced in case 2. > > In case 1 the insert does not start until all the data has arrived at > distinct() while the update is already going along (slowing down upstream > operators as well). In case 2 both sinks wait for their respective > distinct()'s (which is reached much faster now), then start roughly at th= e > same time leading to a shorter net job time for job 2 as compared to 1. > > A pause operator might be useful, yes. > > The update should not be an inherently much more expensive operation, as > the WHERE clause only contains the table's primary key. > > Cheers, > Max > =E2=80=94 > Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com * 0176 > 1000 75 50 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring > Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, Dr. Robert Da= hlke > Sitz: Unterf=C3=B6hring * Amtsgericht M=C3=BCnchen * HRB 135082 > > Am 21.01.2016 um 15:57 schrieb Robert Metzger : > > Hi Max, > > is the distinct() operation reducing the size of the DataSet? If so, I > assume you have an idempotent update and the job is faster because fewer > updates are done? > if the distinct() operator is not changing anything, then, the job might > be faster because the INSERT is done while Flink is still executing the > distinct() operation. So the insert is over when the updates are starting= . > This would mean that concurrent inserts and updates on the database are > much slower than doing this sequentially. > > I'm wondering if there is a way in Flink to explicitly ask for spilling a= n > intermediate operator to "pause" execution: > > Source ----- > (spill for pausing) ---> (update sink) > \ > ------- > (insert) > > I don't have a lot of practical experience with RDBMS, but I guess update= s > are slower because an index lookup + update is necessary. Maybe optimizin= g > the database configuration / schema / indexes is more promising. I think > its indeed much nicer to avoid any unnecessary steps in Flink. > > Did you do any "microbenchmarks" for the update and insert part? I guess > that would help a lot to understand the impact of certain index structure= s, > batching sizes, or database drivers. > > Regards, > Robert > > > > > On Thu, Jan 21, 2016 at 3:35 PM, Maximilian Bode < > maximilian.bode@tngtech.com> wrote: > >> Hi everyone, >> >> in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them >> (doing a database update) is performing slower than the other one (an >> insert). The job as a whole is also slow as upstream operators are slowe= d >> down due to backpressure. I am able to speed up the whole job by >> introducing an a priori unnecessary .distinct(), which of course blocks >> downstream execution of the slow sink, which in turn seems to be able to >> execute faster when given all data at once. >> >> Any ideas what is going on here? Is there something I can do without >> introducing unnecessary computation steps? >> >> Cheers, >> Max >> =E2=80=94 >> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com * 0176 >> 1000 75 50 >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring >> Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, Dr. Robert D= ahlke >> Sitz: Unterf=C3=B6hring * Amtsgericht M=C3=BCnchen * HRB 135082 >> >> > > --001a11c3726a42cab10529ec49dd Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,

have you thought about making two i= ndependent jobs out of this? (or you call execute() for the two separate pa= rts)
One job for the update() and one for the insert() ?

Even though the update operation should not be expensive, = I think its helpful to understand the performance impact of having concurre= nt insert / updates vs executing these operations sequentially ?
= Are the inserts / updates performed on the same table?

=




On Thu, Jan 21, 2016 at 4:17 PM, Maximili= an Bode <maximilian.bode@tngtech.com> wrote:
Hi Robert,=
sorry, I should have been clearer in my initial mail. The two cases I = was comparing are:

1) distinct() before Insert (wh= ich is necessary as we have a unique key constraint in our database), no di= stinct() before update
2) distinct() before insert AND distinct()= before update

The test data used actually only co= ntains unique values for the affected field though, so the dataset size is = not reduced in case 2.

In case 1 the insert does n= ot start until all the data has arrived at distinct() while the update is a= lready going along (slowing down upstream operators as well). In case 2 bot= h sinks wait for their respective distinct()'s (which is reached much f= aster now), then start roughly at the same time leading to a shorter net jo= b time for job 2 as compared to 1.

A pause operato= r might be useful, yes.

The update should not be a= n inherently much more expensive operation, as the WHERE clause only contai= ns the table's primary key.

Cheers,
= Max
=E2=80=94=C2=A0
Maximilian Bode * Junior Consul= tant * max= imilian.bode@tngtech.com * 0176 1000 75 50
TNG Technolog= y Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring
Gesch=C3= =A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
=
Sitz: Unterf=C3=B6hring * Amtsgericht M=C3=BCnchen * HRB 135082
<= /div>

Am 21= .01.2016 um 15:57 schrieb Robert Metzger <rmetzger@apache.org>:

Hi Max,

is the distinct() operation reducing= the size of the DataSet? If so, I assume you have an idempotent update and= the job is faster because fewer updates are done?
if the distinc= t() operator is not changing anything, then, the job might be faster becaus= e the INSERT is done while Flink is still executing the distinct() operatio= n. So the insert is over when the updates are starting. This would mean tha= t concurrent inserts and updates on the database are much slower than doing= this sequentially.

I'm wondering if there is = a way in Flink to explicitly ask for spilling an intermediate operator to &= quot;pause" execution:

Source ----- > (spill for pausing) ---> (update sink)
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 \
=C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0------- > (insert)

= I don't have a lot of practical experience with RDBMS, but I guess upda= tes are slower because an index lookup + update is necessary. Maybe optimiz= ing the database configuration / schema / indexes is more promising. I thin= k its indeed much nicer to avoid any unnecessary steps in Flink.
=
Did you do any "microbenchmarks" for the update an= d insert part? I guess that would help a lot to understand the impact of ce= rtain index structures, batching sizes, or database drivers.

=
Regards,
Robert




On Thu, Jan 21, 2016 at 3:35 PM, Maximilian Bode <maximilian.bo= de@tngtech.com> wrote:
Hi everyone,

in a Flink = (0.10.1) job with two JDBCOutputFormat sinks, one of them (doing a database= update) is performing slower than the other one (an insert). The job as a = whole is also slow as upstream operators are slowed down due to backpressur= e. I am able to speed up the whole job by introducing an a priori unnecessa= ry .distinct(), which of course blocks downstream execution of the slow sin= k, which in turn seems to be able to execute faster when given all data at = once.

Any ideas what is going on here? Is there so= mething I can do without introducing unnecessary computation steps?

Cheers,
Max
=E2=80=94=C2=A0<= /div>
maximilian.bode@tngtech.com * 0176 = 1000 75 50
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unte= rf=C3=B6hring
Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christop= h Stock, Dr. Robert Dahlke
Sitz: Unterf=C3=B6hring * Amtsgericht = M=C3=BCnchen * HRB 135082



--001a11c3726a42cab10529ec49dd--