flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <s.bort...@gmail.com>
Subject Re: threads, parallelism and task managers
Date Tue, 29 Mar 2016 13:50:06 GMT
Perhaps there is a misunderstanding on my side over the parallelism and
split management given a data source.

We started from the current JDBCInputFormat to make it multi-thread. Then,
given a space of keys, we create the splits based on a fetchsize set as a
parameter. In the open, we get a connection from the pool, and execute a
query using the split interval. This sets the 'resultSet', and then the
DatasourceTask iterates between reachedEnd, next and close. On close, the
connection is returned to the pool. We set parallelism to 32, and we would
expect 32 connection opened but the connections opened are just 8.

We tried to make an example with the textinputformat, but being a
delimitedinpurformat, the open is called sequentially when statistics are
built, and then the processing is executed in parallel just after all the
open are executed. This is not feasible in our case, because there would be
millions of queries before the statistics are collected.

Perhaps we are doing something wrong, still to figure out what. :-/

thanks a lot for your help.

saluti,
Stefano


2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:

> That is exactly my point. I should have 32 threads running, but I have
> only 8. 32 Task are created, but only only 8 are run concurrently. Flavio
> and I will try to make a simple program to produce the problem. If we solve
> our issues on the way, we'll let you know.
>
> thanks a lot anyway.
>
> saluti,
> Stefano
>
> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>
>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>> futures and their callbacks. But as Ufuk said, each task will spawn it’s
>> own thread and if you set the parallelism to 32 then you should have 32
>> threads running.
>> ​
>>
>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <s.bortoli@gmail.com>
>> wrote:
>>
>>> In fact, I don't use it. I just had to crawl back the runtime
>>> implementation to get to the point where parallelism was switching from 32
>>> to 8.
>>>
>>> saluti,
>>> Stefano
>>>
>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <till.rohrmann@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> for what do you use the ExecutionContext? That should actually be
>>>> something which you shouldn’t be concerned with since it is only used
>>>> internally by the runtime.
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <s.bortoli@gmail.com>
>>>> wrote:
>>>>
>>>>> Well, in theory yes. Each task has a thread, but only a number is run
>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>>>>> environment. However, whereas the parallelism parameter is set and read
>>>>> correctly, when it comes to actual starting of the threads, the number
is
>>>>> fix to 8. We run a debugger to get to the point where the thread was
>>>>> started. As Flavio mentioned, the ExecutionContext has the parallelims
set
>>>>> to 8. We have a pool of connections to a RDBS and il logs the creation
of
>>>>> just 8 connections although parallelism is much higher.
>>>>>
>>>>> My question is whether this is a bug (or a feature) of the
>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable
>>>>> assignment in setting up of the MiniCluster, involving parallelism and
>>>>> 'default values'. Default values in terms of parallelism are based on
the
>>>>> number of cores.
>>>>>
>>>>> thanks a lot for the support!
>>>>>
>>>>> saluti,
>>>>> Stefano
>>>>>
>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <uce@apache.org>:
>>>>>
>>>>>> Hey Stefano,
>>>>>>
>>>>>> this should work by setting the parallelism on the environment, e.g.
>>>>>>
>>>>>> env.setParallelism(32)
>>>>>>
>>>>>> Is this what you are doing?
>>>>>>
>>>>>> The task threads are not part of a pool, but each submitted task
>>>>>> creates its own Thread.
>>>>>>
>>>>>> – Ufuk
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>>> <pompermaier@okkam.it> wrote:
>>>>>> > Any help here? I think that the problem is that the JobManager
>>>>>> creates the
>>>>>> > executionContext of the scheduler with
>>>>>> >
>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>>>>>> > ForkJoinPool())
>>>>>> >
>>>>>> > and thus the number of concurrently running threads is limited
to
>>>>>> the number
>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>>>>> > What do you think?
>>>>>> >
>>>>>> >
>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <
>>>>>> s.bortoli@gmail.com>
>>>>>> > wrote:
>>>>>> >>
>>>>>> >> Hi guys,
>>>>>> >>
>>>>>> >> I am trying to test a job that should run a number of tasks
to
>>>>>> read from a
>>>>>> >> RDBMS using an improved JDBC connector. The connection and
the
>>>>>> reading run
>>>>>> >> smoothly, but I cannot seem to be able to move above the
limit of 8
>>>>>> >> concurrent threads running. 8 is of course the number of
cores of
>>>>>> my
>>>>>> >> machine.
>>>>>> >>
>>>>>> >> I have tried working around configurations and settings,
but the
>>>>>> Executor
>>>>>> >> within the ExecutionContext keeps on having a parallelism
of 8.
>>>>>> Although, of
>>>>>> >> course, the parallelism of the execution environment is
much
>>>>>> higher (in fact
>>>>>> >> I have many more tasks to be allocated).
>>>>>> >>
>>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration
>>>>>> that may
>>>>>> >> just override/neglect my wish for higher degree of parallelism.
Is
>>>>>> there a
>>>>>> >> way for me to work around this issue?
>>>>>> >>
>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>>>> >>
>>>>>> >> saluti,
>>>>>> >> Stefano
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message