flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: threads, parallelism and task managers
Date Wed, 13 Apr 2016 09:46:00 GMT
No problem ;-)

On Wed, Apr 13, 2016 at 11:38 AM, Stefano Bortoli <s.bortoli@gmail.com>
wrote:

> Sounds you are damn right! thanks for the insight, dumb on us for not
> checking this before.
>
> saluti,
> Stefano
>
> 2016-04-13 11:05 GMT+02:00 Stephan Ewen <sewen@apache.org>:
>
>> Sounds actually not like a Flink issue. I would look into the commons
>> pool docs.
>> Maybe they size their pools by default with the number of cores, so the
>> pool has only 8 threads, and other requests are queues?
>>
>> On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>>> Any feedback about our JDBC InputFormat issue..?
>>>
>>> On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> We've finally created a running example (For Flink 0.10.2) of our
>>>> improved JDBC imputformat that you can run from an IDE (it creates an
>>>> in-memory derby database with 1000 rows and batch of 10) at
>>>> https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
>>>> The first time you run the program you have to comment the following
>>>> line:
>>>>
>>>>         stmt.executeUpdate("Drop Table users ");
>>>>
>>>> In your pom declare the following dependencies:
>>>>
>>>> <dependency>
>>>> <groupId>org.apache.derby</groupId>
>>>> <artifactId>derby</artifactId>
>>>> <version>10.10.1.1</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.commons</groupId>
>>>> <artifactId>commons-pool2</artifactId>
>>>> <version>2.4.2</version>
>>>> </dependency>
>>>>
>>>> In my laptop I have 8 cores and if I put parallelism to 16 I expect to
>>>> see 16 calls to the connection pool (i.e. '==================== CREATING
>>>> NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
>>>> The number of created task instead is correct (16).
>>>>
>>>> I hope this could help in understanding where the problem is!
>>>>
>>>> Best and thank in advance,
>>>> Flavio
>>>>
>>>> On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <s.bortoli@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ufuk,
>>>>>
>>>>> here is our preliminary input formar implementation:
>>>>> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>>>>>
>>>>> if you need a running project, I will have to create a test one cause
>>>>> I cannot share the current configuration.
>>>>>
>>>>> thanks a lot in advance!
>>>>>
>>>>>
>>>>>
>>>>> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi <uce@apache.org>:
>>>>>
>>>>>> Do you have the code somewhere online? Maybe someone can have a quick
>>>>>> look over it later. I'm pretty sure that is indeed a problem with
the
>>>>>> custom input format.
>>>>>>
>>>>>> – Ufuk
>>>>>>
>>>>>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <s.bortoli@gmail.com>
>>>>>> wrote:
>>>>>> > Perhaps there is a misunderstanding on my side over the parallelism
>>>>>> and
>>>>>> > split management given a data source.
>>>>>> >
>>>>>> > We started from the current JDBCInputFormat to make it
>>>>>> multi-thread. Then,
>>>>>> > given a space of keys, we create the splits based on a fetchsize
>>>>>> set as a
>>>>>> > parameter. In the open, we get a connection from the pool, and
>>>>>> execute a
>>>>>> > query using the split interval. This sets the 'resultSet', and
then
>>>>>> the
>>>>>> > DatasourceTask iterates between reachedEnd, next and close.
On
>>>>>> close, the
>>>>>> > connection is returned to the pool. We set parallelism to 32,
and
>>>>>> we would
>>>>>> > expect 32 connection opened but the connections opened are just
8.
>>>>>> >
>>>>>> > We tried to make an example with the textinputformat, but being
a
>>>>>> > delimitedinpurformat, the open is called sequentially when
>>>>>> statistics are
>>>>>> > built, and then the processing is executed in parallel just
after
>>>>>> all the
>>>>>> > open are executed. This is not feasible in our case, because
there
>>>>>> would be
>>>>>> > millions of queries before the statistics are collected.
>>>>>> >
>>>>>> > Perhaps we are doing something wrong, still to figure out what.
:-/
>>>>>> >
>>>>>> > thanks a lot for your help.
>>>>>> >
>>>>>> > saluti,
>>>>>> > Stefano
>>>>>> >
>>>>>> >
>>>>>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:
>>>>>> >>
>>>>>> >> That is exactly my point. I should have 32 threads running,
but I
>>>>>> have
>>>>>> >> only 8. 32 Task are created, but only only 8 are run concurrently.
>>>>>> Flavio
>>>>>> >> and I will try to make a simple program to produce the problem.
If
>>>>>> we solve
>>>>>> >> our issues on the way, we'll let you know.
>>>>>> >>
>>>>>> >> thanks a lot anyway.
>>>>>> >>
>>>>>> >> saluti,
>>>>>> >> Stefano
>>>>>> >>
>>>>>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>>>>>> >>>
>>>>>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt
is used to
>>>>>> run
>>>>>> >>> futures and their callbacks. But as Ufuk said, each
task will
>>>>>> spawn it’s own
>>>>>> >>> thread and if you set the parallelism to 32 then you
should have
>>>>>> 32 threads
>>>>>> >>> running.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
>>>>>> s.bortoli@gmail.com>
>>>>>> >>> wrote:
>>>>>> >>>>
>>>>>> >>>> In fact, I don't use it. I just had to crawl back
the runtime
>>>>>> >>>> implementation to get to the point where parallelism
was
>>>>>> switching from 32
>>>>>> >>>> to 8.
>>>>>> >>>>
>>>>>> >>>> saluti,
>>>>>> >>>> Stefano
>>>>>> >>>>
>>>>>> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <
>>>>>> till.rohrmann@gmail.com>:
>>>>>> >>>>>
>>>>>> >>>>> Hi,
>>>>>> >>>>>
>>>>>> >>>>> for what do you use the ExecutionContext? That
should actually
>>>>>> be
>>>>>> >>>>> something which you shouldn’t be concerned
with since it is
>>>>>> only used
>>>>>> >>>>> internally by the runtime.
>>>>>> >>>>>
>>>>>> >>>>> Cheers,
>>>>>> >>>>> Till
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli
<
>>>>>> s.bortoli@gmail.com>
>>>>>> >>>>> wrote:
>>>>>> >>>>>>
>>>>>> >>>>>> Well, in theory yes. Each task has a thread,
but only a number
>>>>>> is run
>>>>>> >>>>>> in parallel (the job of the scheduler).
 Parallelism is set in
>>>>>> the
>>>>>> >>>>>> environment. However, whereas the parallelism
parameter is set
>>>>>> and read
>>>>>> >>>>>> correctly, when it comes to actual starting
of the threads,
>>>>>> the number is
>>>>>> >>>>>> fix to 8. We run a debugger to get to the
point where the
>>>>>> thread was
>>>>>> >>>>>> started. As Flavio mentioned, the ExecutionContext
has the
>>>>>> parallelims set
>>>>>> >>>>>> to 8. We have a pool of connections to a
RDBS and il logs the
>>>>>> creation of
>>>>>> >>>>>> just 8 connections although parallelism
is much higher.
>>>>>> >>>>>>
>>>>>> >>>>>> My question is whether this is a bug (or
a feature) of the
>>>>>> >>>>>> LocalMiniCluster. :-) I am not scala expert,
but I see some
>>>>>> variable
>>>>>> >>>>>> assignment in setting up of the MiniCluster,
involving
>>>>>> parallelism and
>>>>>> >>>>>> 'default values'. Default values in terms
of parallelism are
>>>>>> based on the
>>>>>> >>>>>> number of cores.
>>>>>> >>>>>>
>>>>>> >>>>>> thanks a lot for the support!
>>>>>> >>>>>>
>>>>>> >>>>>> saluti,
>>>>>> >>>>>> Stefano
>>>>>> >>>>>>
>>>>>> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <uce@apache.org>:
>>>>>> >>>>>>>
>>>>>> >>>>>>> Hey Stefano,
>>>>>> >>>>>>>
>>>>>> >>>>>>> this should work by setting the parallelism
on the
>>>>>> environment, e.g.
>>>>>> >>>>>>>
>>>>>> >>>>>>> env.setParallelism(32)
>>>>>> >>>>>>>
>>>>>> >>>>>>> Is this what you are doing?
>>>>>> >>>>>>>
>>>>>> >>>>>>> The task threads are not part of a pool,
but each submitted
>>>>>> task
>>>>>> >>>>>>> creates its own Thread.
>>>>>> >>>>>>>
>>>>>> >>>>>>> – Ufuk
>>>>>> >>>>>>>
>>>>>> >>>>>>>
>>>>>> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio
Pompermaier
>>>>>> >>>>>>> <pompermaier@okkam.it> wrote:
>>>>>> >>>>>>> > Any help here? I think that the
problem is that the
>>>>>> JobManager
>>>>>> >>>>>>> > creates the
>>>>>> >>>>>>> > executionContext of the scheduler
with
>>>>>> >>>>>>> >
>>>>>> >>>>>>> >        val executionContext =
>>>>>> ExecutionContext.fromExecutor(new
>>>>>> >>>>>>> > ForkJoinPool())
>>>>>> >>>>>>> >
>>>>>> >>>>>>> > and thus the number of concurrently
running threads is
>>>>>> limited to
>>>>>> >>>>>>> > the number
>>>>>> >>>>>>> > of cores (using the default constructor
of the
>>>>>> ForkJoinPool).
>>>>>> >>>>>>> > What do you think?
>>>>>> >>>>>>> >
>>>>>> >>>>>>> >
>>>>>> >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM,
Stefano Bortoli
>>>>>> >>>>>>> > <s.bortoli@gmail.com>
>>>>>> >>>>>>> > wrote:
>>>>>> >>>>>>> >>
>>>>>> >>>>>>> >> Hi guys,
>>>>>> >>>>>>> >>
>>>>>> >>>>>>> >> I am trying to test a job that
should run a number of
>>>>>> tasks to
>>>>>> >>>>>>> >> read from a
>>>>>> >>>>>>> >> RDBMS using an improved JDBC
connector. The connection and
>>>>>> the
>>>>>> >>>>>>> >> reading run
>>>>>> >>>>>>> >> smoothly, but I cannot seem
to be able to move above the
>>>>>> limit of
>>>>>> >>>>>>> >> 8
>>>>>> >>>>>>> >> concurrent threads running.
8 is of course the number of
>>>>>> cores of
>>>>>> >>>>>>> >> my
>>>>>> >>>>>>> >> machine.
>>>>>> >>>>>>> >>
>>>>>> >>>>>>> >> I have tried working around
configurations and settings,
>>>>>> but the
>>>>>> >>>>>>> >> Executor
>>>>>> >>>>>>> >> within the ExecutionContext
keeps on having a parallelism
>>>>>> of 8.
>>>>>> >>>>>>> >> Although, of
>>>>>> >>>>>>> >> course, the parallelism of
the execution environment is
>>>>>> much
>>>>>> >>>>>>> >> higher (in fact
>>>>>> >>>>>>> >> I have many more tasks to be
allocated).
>>>>>> >>>>>>> >>
>>>>>> >>>>>>> >> I feel it may be an issue of
the LocalMiniCluster
>>>>>> configuration
>>>>>> >>>>>>> >> that may
>>>>>> >>>>>>> >> just override/neglect my wish
for higher degree of
>>>>>> parallelism. Is
>>>>>> >>>>>>> >> there a
>>>>>> >>>>>>> >> way for me to work around this
issue?
>>>>>> >>>>>>> >>
>>>>>> >>>>>>> >> please let me know. Thanks
a lot for you help! :-)
>>>>>> >>>>>>> >>
>>>>>> >>>>>>> >> saluti,
>>>>>> >>>>>>> >> Stefano
>>>>>> >>>>>>> >
>>>>>> >>>>>>> >
>>>>>> >>>>>>> >
>>>>>> >>>>>>
>>>>>> >>>>>>
>>>>>> >>>>>
>>>>>> >>>>
>>>>>> >>>
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message