From user-return-27505-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu May 2 15:52:23 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 871DB180671 for ; Thu, 2 May 2019 17:52:22 +0200 (CEST) Received: (qmail 70362 invoked by uid 500); 2 May 2019 15:52:19 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 70350 invoked by uid 99); 2 May 2019 15:52:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 May 2019 15:52:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 52434C1A62 for ; Thu, 2 May 2019 15:52:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.904 X-Spam-Level: * X-Spam-Status: No, score=1.904 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FROM_EXCESS_BASE64=0.105, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id xslCLj8AZamY for ; Thu, 2 May 2019 15:52:17 +0000 (UTC) Received: from mail-lf1-f67.google.com (mail-lf1-f67.google.com [209.85.167.67]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id EBE1F60EF0 for ; Thu, 2 May 2019 15:52:16 +0000 (UTC) Received: by mail-lf1-f67.google.com with SMTP id v1so2269813lfg.5 for ; Thu, 02 May 2019 08:52:16 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=DDM7Qe6+e6+gb5aGN0JT8rs0hQvBtcuFmrN/NU6MYDI=; b=V35djkfY/cnrtYIS40ONBUXx22XxwGDmi11z2uATdOvGP9o8zjBciRcc14cRBKiYDI 58pOINMk2HB2BekH+QEybyBBXcbI/KTuHsW1nE4iZ7I6hFAb8YAcYRU3ORMlC1VRKvuJ z7QkVl8BMAORooFBq/zsCwd/MtVEupktG8x7W4Tp2SHBdXnm2f/CkrrW4tY+UKl1VbGj d18juGcMIYlF8xjYqOSMIDRMRcJITt1T9bbzJA7x6gGQ+vG+1oyo1GqdKJthqXXn83MK IywEGgrC5wmaaarEGn9WyUfa+wl9Q6GwSy/S9LZLf4Rs9KvwCsStTZNhmLv3vFCyRwtu MQgA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=DDM7Qe6+e6+gb5aGN0JT8rs0hQvBtcuFmrN/NU6MYDI=; b=KFgZ5hvTWRgA6DAvCSD27prF0dufH+ahO1Na8MyWjNLId1iO9i0h1W9XIlFuHBHbaF SDaSo25f1fe5mNs2jmIEp0l9GWGJWfPw/NV/SLooUX2gy2SBT+H0mpfXc+trpdlosk4r hEpJqJLIvErN1NnQXfj2Hcd8swvaVBDbeMwlU/q8QZSkkVXliPpCKfzuGdJS1kcYMee4 k074ATx+LULVBHhtHtgGh3Ml90c2HmsgodwZmRTm0j7xtWqXEbi/buzO+yFIJSChda4t OFk3p4C4cKAzLFYLE/FMfUGEytiti7W8PFiR8GNJNVTZNoLrLgpf5v2Co7PbrjGeMFMG 851Q== X-Gm-Message-State: APjAAAXaA7QPDp2r5SOkhqlxUkSxgIgoxTjZwebzXRHv81RKOW5uNtr/ 0jGpuu9ceChW5iwdO4/8TUO7cPIsGyi1TpAxePnR X-Google-Smtp-Source: APXvYqyzffG45eSEnVtzYTAmrQo406m38fZiCtXZ16RCycVc2fcKH3NUmr+Lg8yihExEIb3zgZ7//EAK2R+kjsyXIxk= X-Received: by 2002:a19:4f54:: with SMTP id a20mr2399651lfk.136.1556812336396; Thu, 02 May 2019 08:52:16 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: =?UTF-8?B?SnVhbiBSb2Ryw61ndWV6IEhvcnRhbMOh?= Date: Thu, 2 May 2019 08:52:04 -0700 Message-ID: Subject: Re: Exceptions when launching counts on a Flink DataSet concurrently To: Fabian Hueske Cc: Timo Walther , user Content-Type: multipart/alternative; boundary="0000000000004afea50587e99e14" --0000000000004afea50587e99e14 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Thanks for your answer Fabian. In my opinion this is not just a possible new feature for an optimization, but a bigger problem because the client program crashes with an exception when concurrent counts or collects are triggered on the same data set, and this also happens non deterministically depending on how threads are executed. So that should be documented somewhere. Just my two cents Thanks, Juan On Mon, Apr 29, 2019 at 02:02 Fabian Hueske wrote: > Hi Juan, > > count() and collect() trigger the execution of a job. > Since Flink does not cache intermediate results (yet), all operations fro= m > the sink (count()/collect()) to the sources are executed. > So in a sense a DataSet is immutable (given that the input of the sources > do not change) but completely recomputed for every execution. > > There are currently some efforts [1] on the way to improve Flink behavior > for interactive sessions. > > Best, Fabian > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Inte= ractive+Programming+in+Flink > [2] > https://lists.apache.org/thread.html/5f4961f1dfe23204631fd6f2b3227724ce98= 31f462737f51742a52c1@%3Cdev.flink.apache.org%3E > > Am Fr., 26. Apr. 2019 um 17:03 Uhr schrieb Juan Rodr=C3=ADguez Hortal=C3= =A1 < > juan.rodriguez.hortala@gmail.com>: > >> Hi Timo, >> >> Thanks for your answer. I was surprised to have problems calling those >> methods concurrently, because I though data sets were immutable. Now I >> understand calling count or collect mutates the data set, not its conten= ts >> but some kind of execution plan included in the data set. >> >> I suggest adding a remark about this lack of thread safety to the >> documentation. Maybe it=E2=80=99s already there but I haven=E2=80=99t se= en it. I also >> understand repeated calls to collect and count the safe data set are ok = as >> long as they are done sequentially, and not concurrently. >> >> Thanks, >> >> Juan >> >> On Fri, Apr 26, 2019 at 02:00 Timo Walther wrote: >> >>> Hi Juan, >>> >>> as far as I know we do not provide any concurrency guarantees for >>> count() or collect(). Those methods need to be used with caution anyway= s as >>> the result size must not exceed a certain threshold. I will loop in Fab= ian >>> who might know more about the internals of the execution? >>> >>> Regards, >>> Timo >>> >>> >>> Am 26.04.19 um 03:13 schrieb Juan Rodr=C3=ADguez Hortal=C3=A1: >>> >>> Any thoughts on this? >>> >>> On Sun, Apr 7, 2019, 6:56 PM Juan Rodr=C3=ADguez Hortal=C3=A1 < >>> juan.rodriguez.hortala@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I have a very simple program using the local execution environment, >>>> that throws NPE and other exceptions related to concurrent access when >>>> launching a count for a DataSet from different threads. The program is >>>> https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which >>>> is basically this: >>>> >>>> def doubleCollectConcurrent =3D { >>>> val env =3D ExecutionEnvironment.createLocalEnvironment(3) >>>> val xs =3D env.fromCollection(1 to 100).map{_+1} >>>> implicit val ec =3D ExecutionContext.fromExecutor(Executors.newFixed= ThreadPool(10)) >>>> >>>> val pendingActions =3D Seq.fill(10)( >>>> Future { println(s"xs.count =3D ${xs.count}") } >>>> ) >>>> val pendingActionsFinished =3D Future.fold(pendingActions)(Unit){ (u= 1, u2) =3D> >>>> println("pending action finished") >>>> Unit } >>>> Await.result(pendingActionsFinished, 10 seconds) >>>> >>>> ok} >>>> >>>> >>>> It looks like the issue is on OperatorTranslation.java at >>>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/o= rg/apache/flink/api/java/operators/OperatorTranslation.java#L51, >>>> when a sink is added to the sinks list while that list is being traver= sed. >>>> I have the impression that this is by design, so I'd like to confirm t= hat >>>> this is the expected behaviour, and whether this is happening only for= the >>>> local execution environment, or if this affects all execution environm= ents >>>> implementations. Other related questions I have are: >>>> >>>> - Is this documented somewhere? I'm quite new to Flink, so I might >>>> have missed this. Is there any known workaround for concurrently la= unching >>>> counts and other sink computations on the same DataSet? >>>> - Is it safe performing a sequence of calls to DataSet sink methods >>>> like count or collect, on the same DataSet, as long as they are per= formed >>>> from the same thread? From my experience it looks like it is, but I= 'd like >>>> to get a confirmation if possible. >>>> >>>> This might be related to >>>> https://stackoverflow.com/questions/51035465/concurrentmodificationexc= eption-in-flink >>>> but I'm not sure. >>>> >>>> Thanks a lot for your help. >>>> >>>> Greetings, >>>> >>>> Juan >>>> >>> >>> --0000000000004afea50587e99e14 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Thanks for your answer Fabian.=C2=A0

In my opinion this is not just a poss= ible new feature for an optimization, but a bigger problem because the clie= nt program crashes with an exception when concurrent counts or collects are= triggered on the same data set, and this also happens non deterministicall= y depending on how threads are executed. So that should be documented somew= here.=C2=A0

Just m= y two cents

Thanks,=C2= =A0

Juan=C2=A0

On Mo= n, Apr 29, 2019 at 02:02 Fabian Hueske <fhueske@apache.org> wrote:
Hi Juan,
=

count() and collect() trigger the execution of a job. <= br>
Since Flink does not cache intermediate results (yet), all op= erations from the sink (count()/collect()) to the sources are executed.
So in a sense a DataSet is immutable (given that the inpu= t of the sources do not change) but completely recomputed for every executi= on.

There are currently some efforts [1] on the wa= y to improve Flink behavior for interactive sessions.
<= /div>
A= m Fr., 26. Apr. 2019 um 17:03=C2=A0Uhr schrieb Juan Rodr=C3=ADguez Hortal= =C3=A1 <juan.rodriguez.hortala@gmail.com>:
Hi Timo,=C2=A0

Thanks for your answer= . I was surprised to have problems calling those methods concurrently, beca= use I though data sets were immutable. Now I understand calling count or co= llect mutates the data set, not its contents but some kind of execution pla= n included in the data set.=C2=A0

I suggest adding a remark about this lack of thread safety to the= documentation. Maybe it=E2=80=99s already there but I haven=E2=80=99t seen= it. I also understand repeated calls to collect and count the safe data se= t are ok as long as they are done sequentially, and not concurrently.
=

Thanks,=C2=A0

Juan=C2=A0

On Fri, Apr 26, 2019= at 02:00 Timo Walther <twalthr@apache.org> wrote:
=20 =20 =20
Hi Juan,

as far as I know we do not provide any concurrency guarantees for count() or collect(). Those methods need to be used with caution anyways as the result size must not exceed a certain threshold. I will loop in Fabian who might know more about the internals of the execution?

Regards,
Timo


Am 26.04.19 um 03:13 schrieb Juan Rodr=C3=ADguez Hortal=C3=A1:
=20
Any thoughts on this?

On Sun, Apr 7, 2019, 6:56 PM Juan Rodr=C3=ADguez Hortal=C3=A1 <juan.rodriguez.hortala@gmail.com> wrote:
Hi,

I have a very simple program using the local execution environment, that throws NPE and other exceptions related to concurrent access when launching a count for a DataSet from different threads. The program is https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which is basically this:

def doubleCollectConcurrent =3D {
  val env =3D=
 ExecutionEnvironment.createLocalEnvironm=
ent(3)
  val xs =3D =
env.fromCollection(1 to 100).map{_+1}
  implicit val ec =3D ExecutionContext.fromExecutor(Executors.newFixedThreadPool(=
10))

  val pending=
Actions =3D Seq.fill(10)(
    Future { println(s"xs=
.count =3D ${xs.count}") }
  )
  val pending=
ActionsFinished =3D Future.fold(pe=
ndingActions)(Unit){ (u1, u2) =3D>
    println("pending action finished")
    Unit
  }
  Await.result(pendingActionsFinis=
hed, 10 seconds)

  ok
}

It looks like the issue is on OperatorTranslation.java at https://github.com/apache/flink/blob/master/flink-java/src/main/java/= org/apache/flink/api/java/operators/OperatorTranslation.java#L51, when a sink is added to the sinks list while that list is being traversed. I have the impression that this is by design, so I'd like to confirm that this is the expected behaviour, and whether this is happening only for the local execution environment, or if this affects all execution environments implementations. Other related questions I have are:
  • Is this documented somewhere? I'm quite new to Flink, so I might have missed this. Is there any known workaround for concurrently launching counts and other sink computations on the same DataSet?
  • Is it safe performing a sequence of calls to DataSet sink methods like count or collect, on the same DataSet, as long as they are performed from the same thread? From my experience it looks like it is, but I'd like to get a confirmation if possible.
This might be related to https://stackoverflow.com/questions/51035465/co= ncurrentmodificationexception-in-flink but I'm not sure.

Thanks a lot for your help.

Greetings,

Juan


--0000000000004afea50587e99e14--