Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 29D6419B30 for ; Thu, 7 Apr 2016 10:38:19 +0000 (UTC) Received: (qmail 13179 invoked by uid 500); 7 Apr 2016 10:38:18 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 13097 invoked by uid 500); 7 Apr 2016 10:38:18 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 13087 invoked by uid 99); 7 Apr 2016 10:38:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Apr 2016 10:38:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6D05D1A0466 for ; Thu, 7 Apr 2016 10:38:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.28 X-Spam-Level: * X-Spam-Status: No, score=1.28 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=okkam-it.20150623.gappssmtp.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id ehsBXiZ5qPzB for ; Thu, 7 Apr 2016 10:38:12 +0000 (UTC) Received: from mail-wm0-f44.google.com (mail-wm0-f44.google.com [74.125.82.44]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id B091A5F1BE for ; Thu, 7 Apr 2016 10:38:11 +0000 (UTC) Received: by mail-wm0-f44.google.com with SMTP id u206so81169555wme.1 for ; Thu, 07 Apr 2016 03:38:11 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=okkam-it.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=zZ6J/EudeqHlDrSa9vQ3PXb5bifl64NIDlaM3IvX1nU=; b=e9OflXxutFjYDkidLdAYZY7u+kxrmW8DUR0w11K1nMli+zjUz27gRET5Q9JPUBcuLe rVdvD2sr+UGyikiQ+ldZvYMYLDip3wpHMhxxSsKg/yJ5KsbAyrAW/NsRZkGVE/5Thw3M dVK1g1tdim6a73tGGlFbyvQItSS+6b/tRSCTaLhn3MAhwlqJk7iUA3U+9ks3jXL+ve+1 bkOjeD7vRQqezM6FaqHK4wW/8ykgBrKb3ct/bnuxLkN8At25hP74xsOD7odX9Z9E/+Dm undlPAPH5LTH0XyW6/amm4ytVLil5zoFEUKvqN/dRCI2eoemLl9HeJc9DhhgcfmwE4qi /jcA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=zZ6J/EudeqHlDrSa9vQ3PXb5bifl64NIDlaM3IvX1nU=; b=cs+lHcB2huPxRwQjNWp9xDWiR513x6zHLcv1CIs9MAZTcQDRHJc6P3pZ9BeEIvsTmY 3pQmXoD23D3P6p+sNBq3qPis/x4prA+pgmaJyzyLkzoRMrZ/adKS4zDV/X0GGmkK1cBP l0bnnCmGi6raR8ng3er+MDgRsVPMErVEmuMoJhmSjOG9XsRxJpzHURQYCaZ68m77RQ9R EZaqGHnuJuaR9YUP4kKTqABuMAVvFGTb4oRiFO+gNyTgAcHhcUaCrgqU0U9clPI7nXOz Um6JgEkhPsda6DvFGCfKoJ5AqUcN/tg0ajbwk5YACEiu+Uar+r+3CNFPzKs7prbsVrL0 Xlqw== X-Gm-Message-State: AD7BkJKadr7kW2XTNX9iUUdWDkv71wWww64LYCqoOU+vyvvmNNvvjr1pRagzSOwhIevJHw4hO0q4mYT7HKo6IA== X-Received: by 10.194.6.36 with SMTP id x4mr2679789wjx.122.1460025490338; Thu, 07 Apr 2016 03:38:10 -0700 (PDT) MIME-Version: 1.0 Received: by 10.28.93.82 with HTTP; Thu, 7 Apr 2016 03:37:50 -0700 (PDT) X-Originating-IP: [213.203.177.29] In-Reply-To: References: From: Flavio Pompermaier Date: Thu, 7 Apr 2016 12:37:50 +0200 Message-ID: Subject: Re: threads, parallelism and task managers To: user Content-Type: multipart/alternative; boundary=047d7b5d295cb70911052fe2ac05 --047d7b5d295cb70911052fe2ac05 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable We've finally created a running example (For Flink 0.10.2) of our improved JDBC imputformat that you can run from an IDE (it creates an in-memory derby database with 1000 rows and batch of 10) at https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351. The first time you run the program you have to comment the following line: stmt.executeUpdate("Drop Table users "); In your pom declare the following dependencies: org.apache.derby derby 10.10.1.1 org.apache.commons commons-pool2 2.4.2 In my laptop I have 8 cores and if I put parallelism to 16 I expect to see 16 calls to the connection pool (i.e. '=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D CREATING NEW CONNECTION!') while I see only 8 (up to my maximum number of cores). The number of created task instead is correct (16). I hope this could help in understanding where the problem is! Best and thank in advance, Flavio On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli wrote: > Hi Ufuk, > > here is our preliminary input formar implementation: > https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119 > > if you need a running project, I will have to create a test one cause I > cannot share the current configuration. > > thanks a lot in advance! > > > > 2016-03-30 10:13 GMT+02:00 Ufuk Celebi : > >> Do you have the code somewhere online? Maybe someone can have a quick >> look over it later. I'm pretty sure that is indeed a problem with the >> custom input format. >> >> =E2=80=93 Ufuk >> >> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli >> wrote: >> > Perhaps there is a misunderstanding on my side over the parallelism an= d >> > split management given a data source. >> > >> > We started from the current JDBCInputFormat to make it multi-thread. >> Then, >> > given a space of keys, we create the splits based on a fetchsize set a= s >> a >> > parameter. In the open, we get a connection from the pool, and execute= a >> > query using the split interval. This sets the 'resultSet', and then th= e >> > DatasourceTask iterates between reachedEnd, next and close. On close, >> the >> > connection is returned to the pool. We set parallelism to 32, and we >> would >> > expect 32 connection opened but the connections opened are just 8. >> > >> > We tried to make an example with the textinputformat, but being a >> > delimitedinpurformat, the open is called sequentially when statistics >> are >> > built, and then the processing is executed in parallel just after all >> the >> > open are executed. This is not feasible in our case, because there >> would be >> > millions of queries before the statistics are collected. >> > >> > Perhaps we are doing something wrong, still to figure out what. :-/ >> > >> > thanks a lot for your help. >> > >> > saluti, >> > Stefano >> > >> > >> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli : >> >> >> >> That is exactly my point. I should have 32 threads running, but I hav= e >> >> only 8. 32 Task are created, but only only 8 are run concurrently. >> Flavio >> >> and I will try to make a simple program to produce the problem. If we >> solve >> >> our issues on the way, we'll let you know. >> >> >> >> thanks a lot anyway. >> >> >> >> saluti, >> >> Stefano >> >> >> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann : >> >>> >> >>> Then it shouldn=E2=80=99t be a problem. The ExeuctionContetxt is use= d to run >> >>> futures and their callbacks. But as Ufuk said, each task will spawn >> it=E2=80=99s own >> >>> thread and if you set the parallelism to 32 then you should have 32 >> threads >> >>> running. >> >>> >> >>> >> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli < >> s.bortoli@gmail.com> >> >>> wrote: >> >>>> >> >>>> In fact, I don't use it. I just had to crawl back the runtime >> >>>> implementation to get to the point where parallelism was switching >> from 32 >> >>>> to 8. >> >>>> >> >>>> saluti, >> >>>> Stefano >> >>>> >> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann : >> >>>>> >> >>>>> Hi, >> >>>>> >> >>>>> for what do you use the ExecutionContext? That should actually be >> >>>>> something which you shouldn=E2=80=99t be concerned with since it i= s only >> used >> >>>>> internally by the runtime. >> >>>>> >> >>>>> Cheers, >> >>>>> Till >> >>>>> >> >>>>> >> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli < >> s.bortoli@gmail.com> >> >>>>> wrote: >> >>>>>> >> >>>>>> Well, in theory yes. Each task has a thread, but only a number is >> run >> >>>>>> in parallel (the job of the scheduler). Parallelism is set in th= e >> >>>>>> environment. However, whereas the parallelism parameter is set an= d >> read >> >>>>>> correctly, when it comes to actual starting of the threads, the >> number is >> >>>>>> fix to 8. We run a debugger to get to the point where the thread >> was >> >>>>>> started. As Flavio mentioned, the ExecutionContext has the >> parallelims set >> >>>>>> to 8. We have a pool of connections to a RDBS and il logs the >> creation of >> >>>>>> just 8 connections although parallelism is much higher. >> >>>>>> >> >>>>>> My question is whether this is a bug (or a feature) of the >> >>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some >> variable >> >>>>>> assignment in setting up of the MiniCluster, involving parallelis= m >> and >> >>>>>> 'default values'. Default values in terms of parallelism are base= d >> on the >> >>>>>> number of cores. >> >>>>>> >> >>>>>> thanks a lot for the support! >> >>>>>> >> >>>>>> saluti, >> >>>>>> Stefano >> >>>>>> >> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi : >> >>>>>>> >> >>>>>>> Hey Stefano, >> >>>>>>> >> >>>>>>> this should work by setting the parallelism on the environment, >> e.g. >> >>>>>>> >> >>>>>>> env.setParallelism(32) >> >>>>>>> >> >>>>>>> Is this what you are doing? >> >>>>>>> >> >>>>>>> The task threads are not part of a pool, but each submitted task >> >>>>>>> creates its own Thread. >> >>>>>>> >> >>>>>>> =E2=80=93 Ufuk >> >>>>>>> >> >>>>>>> >> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier >> >>>>>>> wrote: >> >>>>>>> > Any help here? I think that the problem is that the JobManager >> >>>>>>> > creates the >> >>>>>>> > executionContext of the scheduler with >> >>>>>>> > >> >>>>>>> > val executionContext =3D ExecutionContext.fromExecutor(= new >> >>>>>>> > ForkJoinPool()) >> >>>>>>> > >> >>>>>>> > and thus the number of concurrently running threads is limited >> to >> >>>>>>> > the number >> >>>>>>> > of cores (using the default constructor of the ForkJoinPool). >> >>>>>>> > What do you think? >> >>>>>>> > >> >>>>>>> > >> >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli >> >>>>>>> > >> >>>>>>> > wrote: >> >>>>>>> >> >> >>>>>>> >> Hi guys, >> >>>>>>> >> >> >>>>>>> >> I am trying to test a job that should run a number of tasks t= o >> >>>>>>> >> read from a >> >>>>>>> >> RDBMS using an improved JDBC connector. The connection and th= e >> >>>>>>> >> reading run >> >>>>>>> >> smoothly, but I cannot seem to be able to move above the limi= t >> of >> >>>>>>> >> 8 >> >>>>>>> >> concurrent threads running. 8 is of course the number of core= s >> of >> >>>>>>> >> my >> >>>>>>> >> machine. >> >>>>>>> >> >> >>>>>>> >> I have tried working around configurations and settings, but >> the >> >>>>>>> >> Executor >> >>>>>>> >> within the ExecutionContext keeps on having a parallelism of = 8. >> >>>>>>> >> Although, of >> >>>>>>> >> course, the parallelism of the execution environment is much >> >>>>>>> >> higher (in fact >> >>>>>>> >> I have many more tasks to be allocated). >> >>>>>>> >> >> >>>>>>> >> I feel it may be an issue of the LocalMiniCluster configurati= on >> >>>>>>> >> that may >> >>>>>>> >> just override/neglect my wish for higher degree of >> parallelism. Is >> >>>>>>> >> there a >> >>>>>>> >> way for me to work around this issue? >> >>>>>>> >> >> >>>>>>> >> please let me know. Thanks a lot for you help! :-) >> >>>>>>> >> >> >>>>>>> >> saluti, >> >>>>>>> >> Stefano >> >>>>>>> > >> >>>>>>> > >> >>>>>>> > >> >>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> > >> > > --047d7b5d295cb70911052fe2ac05 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
We've finally created a running example (For Flink 0.1= 0.2) of our improved JDBC imputformat that you can run from an IDE (it crea= tes an in-memory derby database with 1000 rows and batch of 10) at=C2=A0https://gist.github.com/fpompermaier/bcd704abc93b25b6= 744ac76ac17ed351.
The first time you run the program you have to co= mment the following line:

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 stmt.executeUpdate("Drop Table users ");

In your pom declare the following dependencies:

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>= ;
<version>10.= 10.1.1</version>
</dependency>
<dependen= cy>
<groupId&g= t;org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</ve= rsion>
</dependency>

In m= y laptop I have 8 cores and if I put parallelism to 16 I expect to see 16 c= alls to the connection pool (i.e. '=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D CREATING NEW CONNECTION!') while I see only 8 = (up to my maximum number of cores).
The number of created task in= stead is correct (16).

I hope this could help in u= nderstanding where the problem is!

Best and thank = in advance,
Flavio

On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <s.bortoli@gmail.com> wrote:
Hi Ufuk,

here is our prelim= inary input formar implementation:
https://gist.gi= thub.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119

if you= need a running project, I will have to create a test one cause I cannot sh= are the current configuration.

thanks a lot in advance!



<= div class=3D"gmail_quote">2016-03-30 10:13 GMT+02:00 Ufuk Celebi <uce@apache= .org>:
Do you have the code= somewhere online? Maybe someone can have a quick
look over it later. I'm pretty sure that is indeed a problem with the custom input format.

=E2=80=93 Ufuk

On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <s.bortoli@gmail.com> wrote:
> Perhaps there is a misunderstanding on my side over the parallelism an= d
> split management given a data source.
>
> We started from the current JDBCInputFormat to make it multi-thread. T= hen,
> given a space of keys, we create the splits based on a fetchsize set a= s a
> parameter. In the open, we get a connection from the pool, and execute= a
> query using the split interval. This sets the 'resultSet', and= then the
> DatasourceTask iterates between reachedEnd, next and close. On close, = the
> connection is returned to the pool. We set parallelism to 32, and we w= ould
> expect 32 connection opened but the connections opened are just 8.
>
> We tried to make an example with the textinputformat, but being a
> delimitedinpurformat, the open is called sequentially when statistics = are
> built, and then the processing is executed in parallel just after all = the
> open are executed. This is not feasible in our case, because there wou= ld be
> millions of queries before the statistics are collected.
>
> Perhaps we are doing something wrong, still to figure out what. :-/ >
> thanks a lot for your help.
>
> saluti,
> Stefano
>
>
> 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:
>>
>> That is exactly my point. I should have 32 threads running, but I = have
>> only 8. 32 Task are created, but only only 8 are run concurrently.= Flavio
>> and I will try to make a simple program to produce the problem. If= we solve
>> our issues on the way, we'll let you know.
>>
>> thanks a lot anyway.
>>
>> saluti,
>> Stefano
>>
>> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>>>
>>> Then it shouldn=E2=80=99t be a problem. The ExeuctionContetxt = is used to run
>>> futures and their callbacks. But as Ufuk said, each task will = spawn it=E2=80=99s own
>>> thread and if you set the parallelism to 32 then you should ha= ve 32 threads
>>> running.
>>>
>>>
>>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <s.bortoli@gmail.com&g= t;
>>> wrote:
>>>>
>>>> In fact, I don't use it. I just had to crawl back the = runtime
>>>> implementation to get to the point where parallelism was s= witching from 32
>>>> to 8.
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <till.rohrmann@gmail.com= >:
>>>>>
>>>>> Hi,
>>>>>
>>>>> for what do you use the ExecutionContext? That should = actually be
>>>>> something which you shouldn=E2=80=99t be concerned wit= h since it is only used
>>>>> internally by the runtime.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>>
>>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <= s.bortoli@gmail.co= m>
>>>>> wrote:
>>>>>>
>>>>>> Well, in theory yes. Each task has a thread, but o= nly a number is run
>>>>>> in parallel (the job of the scheduler).=C2=A0 Para= llelism is set in the
>>>>>> environment. However, whereas the parallelism para= meter is set and read
>>>>>> correctly, when it comes to actual starting of the= threads, the number is
>>>>>> fix to 8. We run a debugger to get to the point wh= ere the thread was
>>>>>> started. As Flavio mentioned, the ExecutionContext= has the parallelims set
>>>>>> to 8. We have a pool of connections to a RDBS and = il logs the creation of
>>>>>> just 8 connections although parallelism is much hi= gher.
>>>>>>
>>>>>> My question is whether this is a bug (or a feature= ) of the
>>>>>> LocalMiniCluster. :-) I am not scala expert, but I= see some variable
>>>>>> assignment in setting up of the MiniCluster, invol= ving parallelism and
>>>>>> 'default values'. Default values in terms = of parallelism are based on the
>>>>>> number of cores.
>>>>>>
>>>>>> thanks a lot for the support!
>>>>>>
>>>>>> saluti,
>>>>>> Stefano
>>>>>>
>>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <uce@apache.org>:
>>>>>>>
>>>>>>> Hey Stefano,
>>>>>>>
>>>>>>> this should work by setting the parallelism on= the environment, e.g.
>>>>>>>
>>>>>>> env.setParallelism(32)
>>>>>>>
>>>>>>> Is this what you are doing?
>>>>>>>
>>>>>>> The task threads are not part of a pool, but e= ach submitted task
>>>>>>> creates its own Thread.
>>>>>>>
>>>>>>> =E2=80=93 Ufuk
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pomper= maier
>>>>>>> <pompermaier@okkam.it> wrote:
>>>>>>> > Any help here? I think that the problem i= s that the JobManager
>>>>>>> > creates the
>>>>>>> > executionContext of the scheduler with >>>>>>> >
>>>>>>> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 val executionC= ontext =3D ExecutionContext.fromExecutor(new
>>>>>>> > ForkJoinPool())
>>>>>>> >
>>>>>>> > and thus the number of concurrently runni= ng threads is limited to
>>>>>>> > the number
>>>>>>> > of cores (using the default constructor o= f the ForkJoinPool).
>>>>>>> > What do you think?
>>>>>>> >
>>>>>>> >
>>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano = Bortoli
>>>>>>> > <s.bortoli@gmail.com>
>>>>>>> > wrote:
>>>>>>> >>
>>>>>>> >> Hi guys,
>>>>>>> >>
>>>>>>> >> I am trying to test a job that should= run a number of tasks to
>>>>>>> >> read from a
>>>>>>> >> RDBMS using an improved JDBC connecto= r. The connection and the
>>>>>>> >> reading run
>>>>>>> >> smoothly, but I cannot seem to be abl= e to move above the limit of
>>>>>>> >> 8
>>>>>>> >> concurrent threads running. 8 is of c= ourse the number of cores of
>>>>>>> >> my
>>>>>>> >> machine.
>>>>>>> >>
>>>>>>> >> I have tried working around configura= tions and settings, but the
>>>>>>> >> Executor
>>>>>>> >> within the ExecutionContext keeps on = having a parallelism of 8.
>>>>>>> >> Although, of
>>>>>>> >> course, the parallelism of the execut= ion environment is much
>>>>>>> >> higher (in fact
>>>>>>> >> I have many more tasks to be allocate= d).
>>>>>>> >>
>>>>>>> >> I feel it may be an issue of the Loca= lMiniCluster configuration
>>>>>>> >> that may
>>>>>>> >> just override/neglect my wish for hig= her degree of parallelism. Is
>>>>>>> >> there a
>>>>>>> >> way for me to work around this issue?=
>>>>>>> >>
>>>>>>> >> please let me know. Thanks a lot for = you help! :-)
>>>>>>> >>
>>>>>>> >> saluti,
>>>>>>> >> Stefano
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


<= /p>

--047d7b5d295cb70911052fe2ac05--