flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: threads, parallelism and task managers
Date Thu, 07 Apr 2016 10:37:50 GMT
We've finally created a running example (For Flink 0.10.2) of our improved
JDBC imputformat that you can run from an IDE (it creates an in-memory
derby database with 1000 rows and batch of 10) at
https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
The first time you run the program you have to comment the following line:

        stmt.executeUpdate("Drop Table users ");

In your pom declare the following dependencies:

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>

In my laptop I have 8 cores and if I put parallelism to 16 I expect to see
16 calls to the connection pool (i.e. '==================== CREATING NEW
CONNECTION!') while I see only 8 (up to my maximum number of cores).
The number of created task instead is correct (16).

I hope this could help in understanding where the problem is!

Best and thank in advance,
Flavio

On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <s.bortoli@gmail.com>
wrote:

> Hi Ufuk,
>
> here is our preliminary input formar implementation:
> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>
> if you need a running project, I will have to create a test one cause I
> cannot share the current configuration.
>
> thanks a lot in advance!
>
>
>
> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi <uce@apache.org>:
>
>> Do you have the code somewhere online? Maybe someone can have a quick
>> look over it later. I'm pretty sure that is indeed a problem with the
>> custom input format.
>>
>> – Ufuk
>>
>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <s.bortoli@gmail.com>
>> wrote:
>> > Perhaps there is a misunderstanding on my side over the parallelism and
>> > split management given a data source.
>> >
>> > We started from the current JDBCInputFormat to make it multi-thread.
>> Then,
>> > given a space of keys, we create the splits based on a fetchsize set as
>> a
>> > parameter. In the open, we get a connection from the pool, and execute a
>> > query using the split interval. This sets the 'resultSet', and then the
>> > DatasourceTask iterates between reachedEnd, next and close. On close,
>> the
>> > connection is returned to the pool. We set parallelism to 32, and we
>> would
>> > expect 32 connection opened but the connections opened are just 8.
>> >
>> > We tried to make an example with the textinputformat, but being a
>> > delimitedinpurformat, the open is called sequentially when statistics
>> are
>> > built, and then the processing is executed in parallel just after all
>> the
>> > open are executed. This is not feasible in our case, because there
>> would be
>> > millions of queries before the statistics are collected.
>> >
>> > Perhaps we are doing something wrong, still to figure out what. :-/
>> >
>> > thanks a lot for your help.
>> >
>> > saluti,
>> > Stefano
>> >
>> >
>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:
>> >>
>> >> That is exactly my point. I should have 32 threads running, but I have
>> >> only 8. 32 Task are created, but only only 8 are run concurrently.
>> Flavio
>> >> and I will try to make a simple program to produce the problem. If we
>> solve
>> >> our issues on the way, we'll let you know.
>> >>
>> >> thanks a lot anyway.
>> >>
>> >> saluti,
>> >> Stefano
>> >>
>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>> >>>
>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>> >>> futures and their callbacks. But as Ufuk said, each task will spawn
>> it’s own
>> >>> thread and if you set the parallelism to 32 then you should have 32
>> threads
>> >>> running.
>> >>>
>> >>>
>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
>> s.bortoli@gmail.com>
>> >>> wrote:
>> >>>>
>> >>>> In fact, I don't use it. I just had to crawl back the runtime
>> >>>> implementation to get to the point where parallelism was switching
>> from 32
>> >>>> to 8.
>> >>>>
>> >>>> saluti,
>> >>>> Stefano
>> >>>>
>> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <till.rohrmann@gmail.com>:
>> >>>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> for what do you use the ExecutionContext? That should actually
be
>> >>>>> something which you shouldn’t be concerned with since it is
only
>> used
>> >>>>> internally by the runtime.
>> >>>>>
>> >>>>> Cheers,
>> >>>>> Till
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <
>> s.bortoli@gmail.com>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Well, in theory yes. Each task has a thread, but only a
number is
>> run
>> >>>>>> in parallel (the job of the scheduler).  Parallelism is
set in the
>> >>>>>> environment. However, whereas the parallelism parameter
is set and
>> read
>> >>>>>> correctly, when it comes to actual starting of the threads,
the
>> number is
>> >>>>>> fix to 8. We run a debugger to get to the point where the
thread
>> was
>> >>>>>> started. As Flavio mentioned, the ExecutionContext has the
>> parallelims set
>> >>>>>> to 8. We have a pool of connections to a RDBS and il logs
the
>> creation of
>> >>>>>> just 8 connections although parallelism is much higher.
>> >>>>>>
>> >>>>>> My question is whether this is a bug (or a feature) of the
>> >>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some
>> variable
>> >>>>>> assignment in setting up of the MiniCluster, involving parallelism
>> and
>> >>>>>> 'default values'. Default values in terms of parallelism
are based
>> on the
>> >>>>>> number of cores.
>> >>>>>>
>> >>>>>> thanks a lot for the support!
>> >>>>>>
>> >>>>>> saluti,
>> >>>>>> Stefano
>> >>>>>>
>> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <uce@apache.org>:
>> >>>>>>>
>> >>>>>>> Hey Stefano,
>> >>>>>>>
>> >>>>>>> this should work by setting the parallelism on the environment,
>> e.g.
>> >>>>>>>
>> >>>>>>> env.setParallelism(32)
>> >>>>>>>
>> >>>>>>> Is this what you are doing?
>> >>>>>>>
>> >>>>>>> The task threads are not part of a pool, but each submitted
task
>> >>>>>>> creates its own Thread.
>> >>>>>>>
>> >>>>>>> – Ufuk
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>> >>>>>>> <pompermaier@okkam.it> wrote:
>> >>>>>>> > Any help here? I think that the problem is that
the JobManager
>> >>>>>>> > creates the
>> >>>>>>> > executionContext of the scheduler with
>> >>>>>>> >
>> >>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>> >>>>>>> > ForkJoinPool())
>> >>>>>>> >
>> >>>>>>> > and thus the number of concurrently running threads
is limited
>> to
>> >>>>>>> > the number
>> >>>>>>> > of cores (using the default constructor of the
ForkJoinPool).
>> >>>>>>> > What do you think?
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>> >>>>>>> > <s.bortoli@gmail.com>
>> >>>>>>> > wrote:
>> >>>>>>> >>
>> >>>>>>> >> Hi guys,
>> >>>>>>> >>
>> >>>>>>> >> I am trying to test a job that should run a
number of tasks to
>> >>>>>>> >> read from a
>> >>>>>>> >> RDBMS using an improved JDBC connector. The
connection and the
>> >>>>>>> >> reading run
>> >>>>>>> >> smoothly, but I cannot seem to be able to move
above the limit
>> of
>> >>>>>>> >> 8
>> >>>>>>> >> concurrent threads running. 8 is of course
the number of cores
>> of
>> >>>>>>> >> my
>> >>>>>>> >> machine.
>> >>>>>>> >>
>> >>>>>>> >> I have tried working around configurations
and settings, but
>> the
>> >>>>>>> >> Executor
>> >>>>>>> >> within the ExecutionContext keeps on having
a parallelism of 8.
>> >>>>>>> >> Although, of
>> >>>>>>> >> course, the parallelism of the execution environment
is much
>> >>>>>>> >> higher (in fact
>> >>>>>>> >> I have many more tasks to be allocated).
>> >>>>>>> >>
>> >>>>>>> >> I feel it may be an issue of the LocalMiniCluster
configuration
>> >>>>>>> >> that may
>> >>>>>>> >> just override/neglect my wish for higher degree
of
>> parallelism. Is
>> >>>>>>> >> there a
>> >>>>>>> >> way for me to work around this issue?
>> >>>>>>> >>
>> >>>>>>> >> please let me know. Thanks a lot for you help!
:-)
>> >>>>>>> >>
>> >>>>>>> >> saluti,
>> >>>>>>> >> Stefano
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>
>

Mime
View raw message