Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B29D7194B4 for ; Tue, 29 Mar 2016 13:50:33 +0000 (UTC) Received: (qmail 94323 invoked by uid 500); 29 Mar 2016 13:50:33 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 94238 invoked by uid 500); 29 Mar 2016 13:50:33 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 94228 invoked by uid 99); 29 Mar 2016 13:50:31 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Mar 2016 13:50:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id E1F701800EE for ; Tue, 29 Mar 2016 13:50:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 07gXphPPqMzU for ; Tue, 29 Mar 2016 13:50:28 +0000 (UTC) Received: from mail-wm0-f43.google.com (mail-wm0-f43.google.com [74.125.82.43]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id 897765F23C for ; Tue, 29 Mar 2016 13:50:27 +0000 (UTC) Received: by mail-wm0-f43.google.com with SMTP id p65so140173593wmp.1 for ; Tue, 29 Mar 2016 06:50:27 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=j2VsEL4B9fHt5LZM1mrnpIjwK41HtudIioDyxSB7bNs=; b=mVFjbm+4nNI5wpREmqlWCfa7oSXddxrGKv1DmDzkHuo8wWuHZVq2pvlc0/hpfT5DQ8 VDRHN5B1VhGvrT/srD0zRxzsvcvMQWcNQPtSfSmqWRTU0RdMU5/eAxu2kr/QBYl4Wugw K53JtJNdznCg8fIJCVJG6Rd3srQ50NKe+C4NzXrR43v71+RuR+i5tC/pVgnHfN35Ghy3 VG0GNYtM2vfahNYZHY38VbZucUiI6ETQx4M9X7CbIJfDKEVtCS0R/e+8Hfz9XphPybtx ZIORSNHtWCoFExQuUHjdDZIcUudIEt7B2s+u5kgGW+hU/tfzSCScWhFhPv6TFQc48xan Cltw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=j2VsEL4B9fHt5LZM1mrnpIjwK41HtudIioDyxSB7bNs=; b=CAxX+H/uC8aGI+YLeLhwItDfSxEJOQBycuH5sYakME9oFSemGZMTl9P8y0PPmJ0uCz pqSRwXXnaiwS3ycMc+EEeQC5Utghun4dVk8PCn/3c2g6G7Nc4pRATtK9PSUVCH2TYbly AKjIwV158kLB7ioVUFe5Q8VWtIz3cPiUwi7GUJJsBgYiA52GgMV2ypx/gutXl7XsuED1 5kt7IYVOM4ip435IpaXWBfFvpVFvio7OW6O2GtQCt28mwSO6CiC1t6DzdJsSxJDSAq01 YkEf7y3kQ/o6EbMK5qQ7tLhIc5Mn6wf5dLMHMxN3Z9mevHcaKzS6AsoQy8yi/6EINRdF iQyA== X-Gm-Message-State: AD7BkJJ6xt7I5TRnuuFAGmuOn98e2Mj7xhF4X0Fakll9/PzLFdiIOblNJARkMEr7a1sGEVE5eLHvkr3/ytmUXA== X-Received: by 10.194.179.168 with SMTP id dh8mr3251531wjc.130.1459259426203; Tue, 29 Mar 2016 06:50:26 -0700 (PDT) MIME-Version: 1.0 Received: by 10.28.211.203 with HTTP; Tue, 29 Mar 2016 06:50:06 -0700 (PDT) In-Reply-To: References: From: Stefano Bortoli Date: Tue, 29 Mar 2016 15:50:06 +0200 Message-ID: Subject: Re: threads, parallelism and task managers To: user@flink.apache.org Content-Type: multipart/alternative; boundary=089e0102e974bbefae052f304ff7 --089e0102e974bbefae052f304ff7 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Perhaps there is a misunderstanding on my side over the parallelism and split management given a data source. We started from the current JDBCInputFormat to make it multi-thread. Then, given a space of keys, we create the splits based on a fetchsize set as a parameter. In the open, we get a connection from the pool, and execute a query using the split interval. This sets the 'resultSet', and then the DatasourceTask iterates between reachedEnd, next and close. On close, the connection is returned to the pool. We set parallelism to 32, and we would expect 32 connection opened but the connections opened are just 8. We tried to make an example with the textinputformat, but being a delimitedinpurformat, the open is called sequentially when statistics are built, and then the processing is executed in parallel just after all the open are executed. This is not feasible in our case, because there would be millions of queries before the statistics are collected. Perhaps we are doing something wrong, still to figure out what. :-/ thanks a lot for your help. saluti, Stefano 2016-03-29 13:30 GMT+02:00 Stefano Bortoli : > That is exactly my point. I should have 32 threads running, but I have > only 8. 32 Task are created, but only only 8 are run concurrently. Flavio > and I will try to make a simple program to produce the problem. If we sol= ve > our issues on the way, we'll let you know. > > thanks a lot anyway. > > saluti, > Stefano > > 2016-03-29 12:44 GMT+02:00 Till Rohrmann : > >> Then it shouldn=E2=80=99t be a problem. The ExeuctionContetxt is used to= run >> futures and their callbacks. But as Ufuk said, each task will spawn it= =E2=80=99s >> own thread and if you set the parallelism to 32 then you should have 32 >> threads running. >> =E2=80=8B >> >> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli >> wrote: >> >>> In fact, I don't use it. I just had to crawl back the runtime >>> implementation to get to the point where parallelism was switching from= 32 >>> to 8. >>> >>> saluti, >>> Stefano >>> >>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann : >>> >>>> Hi, >>>> >>>> for what do you use the ExecutionContext? That should actually be >>>> something which you shouldn=E2=80=99t be concerned with since it is on= ly used >>>> internally by the runtime. >>>> >>>> Cheers, >>>> Till >>>> =E2=80=8B >>>> >>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli >>>> wrote: >>>> >>>>> Well, in theory yes. Each task has a thread, but only a number is run >>>>> in parallel (the job of the scheduler). Parallelism is set in the >>>>> environment. However, whereas the parallelism parameter is set and re= ad >>>>> correctly, when it comes to actual starting of the threads, the numbe= r is >>>>> fix to 8. We run a debugger to get to the point where the thread was >>>>> started. As Flavio mentioned, the ExecutionContext has the parallelim= s set >>>>> to 8. We have a pool of connections to a RDBS and il logs the creatio= n of >>>>> just 8 connections although parallelism is much higher. >>>>> >>>>> My question is whether this is a bug (or a feature) of the >>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable >>>>> assignment in setting up of the MiniCluster, involving parallelism an= d >>>>> 'default values'. Default values in terms of parallelism are based on= the >>>>> number of cores. >>>>> >>>>> thanks a lot for the support! >>>>> >>>>> saluti, >>>>> Stefano >>>>> >>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi : >>>>> >>>>>> Hey Stefano, >>>>>> >>>>>> this should work by setting the parallelism on the environment, e.g. >>>>>> >>>>>> env.setParallelism(32) >>>>>> >>>>>> Is this what you are doing? >>>>>> >>>>>> The task threads are not part of a pool, but each submitted task >>>>>> creates its own Thread. >>>>>> >>>>>> =E2=80=93 Ufuk >>>>>> >>>>>> >>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier >>>>>> wrote: >>>>>> > Any help here? I think that the problem is that the JobManager >>>>>> creates the >>>>>> > executionContext of the scheduler with >>>>>> > >>>>>> > val executionContext =3D ExecutionContext.fromExecutor(new >>>>>> > ForkJoinPool()) >>>>>> > >>>>>> > and thus the number of concurrently running threads is limited to >>>>>> the number >>>>>> > of cores (using the default constructor of the ForkJoinPool). >>>>>> > What do you think? >>>>>> > >>>>>> > >>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli < >>>>>> s.bortoli@gmail.com> >>>>>> > wrote: >>>>>> >> >>>>>> >> Hi guys, >>>>>> >> >>>>>> >> I am trying to test a job that should run a number of tasks to >>>>>> read from a >>>>>> >> RDBMS using an improved JDBC connector. The connection and the >>>>>> reading run >>>>>> >> smoothly, but I cannot seem to be able to move above the limit of= 8 >>>>>> >> concurrent threads running. 8 is of course the number of cores of >>>>>> my >>>>>> >> machine. >>>>>> >> >>>>>> >> I have tried working around configurations and settings, but the >>>>>> Executor >>>>>> >> within the ExecutionContext keeps on having a parallelism of 8. >>>>>> Although, of >>>>>> >> course, the parallelism of the execution environment is much >>>>>> higher (in fact >>>>>> >> I have many more tasks to be allocated). >>>>>> >> >>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration >>>>>> that may >>>>>> >> just override/neglect my wish for higher degree of parallelism. I= s >>>>>> there a >>>>>> >> way for me to work around this issue? >>>>>> >> >>>>>> >> please let me know. Thanks a lot for you help! :-) >>>>>> >> >>>>>> >> saluti, >>>>>> >> Stefano >>>>>> > >>>>>> > >>>>>> > >>>>>> >>>>> >>>>> >>>> >>> >> > --089e0102e974bbefae052f304ff7 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Perhaps there is a misunderstanding on my s= ide over the parallelism and split management given a data source.

W= e started from the current JDBCInputFormat to make it multi-thread. Then, g= iven a space of keys, we create the splits based on a fetchsize set as a pa= rameter. In the open, we get a connection from the pool, and execute a quer= y using the split interval. This sets the 'resultSet', and then the= DatasourceTask iterates between reachedEnd, next and close. On close, the = connection is returned to the pool. We set parallelism to 32, and we would = expect 32 connection opened but the connections opened are just 8.

We tried to make an example with the textinputformat, but being= a delimitedinpurformat, the open is called sequentially when statistics ar= e built, and then the processing is executed in parallel just after all the= open are executed. This is not feasible in our case, because there would b= e millions of queries before the statistics are collected.

Perhaps we are doing something wrong, still to figure out what. :-/
thanks a lot for your help.

saluti,
Stefano


=
2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s= .bortoli@gmail.com>:
That is exactly my point. I should have 32 thread= s running, but I have only 8. 32 Task are created, but only only 8 are run = concurrently. Flavio and I will try to make a simple program to produce the= problem. If we solve our issues on the way, we'll let you know.
thanks a lot anyway.

saluti,
Stefano
=

2016-03-29 12:44 GMT+02:00 Till Rohrmann <troh= rmann@apache.org>:

Then it shouldn= =E2=80=99t be a problem. The ExeuctionContetxt is u= sed to run futures and their callbacks. But as Ufuk said, each task will sp= awn it=E2=80=99s own thread and if you set the parallelism to 32 then you should have 32 threads run= ning.

=E2=80=8B

On Tue, M= ar 29, 2016 at 12:29 PM, Stefano Bortoli <s.bortoli@gmail.com> wrote:
In= fact, I don't use it. I just had to crawl back the runtime implementat= ion to get to the point where parallelism was switching from 32 to 8.
<= br>
saluti,
Stefano

2016-03-29 12:24 GMT+02:00 Till Rohrman= n <till.rohrmann@gmail.com>:

for what do you use the ExecutionContext? That should actually be something which = you shouldn=E2=80=99t be concerned with since it is only used internally by= the runtime.

Cheers,
Till

=E2=80=8B

On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <s.bortoli@gmail.com<= /a>> wrote:
Well, in theory yes. Each task has a thread, but only a number= is run in parallel (the job of the scheduler).=C2=A0 Parallelism is set in= the environment. However, whereas the parallelism parameter is set and rea= d correctly, when it comes to actual starting of the threads, the number is= fix to 8. We run a debugger to get to the point where the thread was start= ed. As Flavio mentioned, the ExecutionContext has the parallelims set to 8.= We have a pool of connections to a RDBS and il logs the creation of just 8= connections although parallelism is much higher.

My question = is whether this is a bug (or a feature) of the LocalMiniCluster. :-) I am n= ot scala expert, but I see some variable assignment in setting up of the Mi= niCluster, involving parallelism and 'default values'. Default valu= es in terms of parallelism are based on the number of cores.

<= div>thanks a lot for the support!

saluti,
= Stefano

2016-03-29 11:51 GMT+02:00 Ufuk Celebi <uce@apache.org>:
Hey Stefano,

this should work by setting the parallelism on the environment, e.g.

env.setParallelism(32)

Is this what you are doing?

The task threads are not part of a pool, but each submitted task
creates its own Thread.

=E2=80=93 Ufuk


On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
<pompermaier@o= kkam.it> wrote:
> Any help here? I think that the problem is that the JobManager creates= the
> executionContext of the scheduler with
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 val executionContext =3D ExecutionContext.f= romExecutor(new
> ForkJoinPool())
>
> and thus the number of concurrently running threads is limited to the = number
> of cores (using the default constructor of the ForkJoinPool).
> What do you think?
>
>
> On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <s.bortoli@gmail.com>
> wrote:
>>
>> Hi guys,
>>
>> I am trying to test a job that should run a number of tasks to rea= d from a
>> RDBMS using an improved JDBC connector. The connection and the rea= ding run
>> smoothly, but I cannot seem to be able to move above the limit of = 8
>> concurrent threads running. 8 is of course the number of cores of = my
>> machine.
>>
>> I have tried working around configurations and settings, but the E= xecutor
>> within the ExecutionContext keeps on having a parallelism of 8. Al= though, of
>> course, the parallelism of the execution environment is much highe= r (in fact
>> I have many more tasks to be allocated).
>>
>> I feel it may be an issue of the LocalMiniCluster configuration th= at may
>> just override/neglect my wish for higher degree of parallelism. Is= there a
>> way for me to work around this issue?
>>
>> please let me know. Thanks a lot for you help! :-)
>>
>> saluti,
>> Stefano
>
>
>






--089e0102e974bbefae052f304ff7--