flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: threads, parallelism and task managers
Date Wed, 13 Apr 2016 08:29:54 GMT
Any feedback about our JDBC InputFormat issue..?

On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> We've finally created a running example (For Flink 0.10.2) of our improved
> JDBC imputformat that you can run from an IDE (it creates an in-memory
> derby database with 1000 rows and batch of 10) at
> https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
> The first time you run the program you have to comment the following line:
>
>         stmt.executeUpdate("Drop Table users ");
>
> In your pom declare the following dependencies:
>
> <dependency>
> <groupId>org.apache.derby</groupId>
> <artifactId>derby</artifactId>
> <version>10.10.1.1</version>
> </dependency>
> <dependency>
> <groupId>org.apache.commons</groupId>
> <artifactId>commons-pool2</artifactId>
> <version>2.4.2</version>
> </dependency>
>
> In my laptop I have 8 cores and if I put parallelism to 16 I expect to see
> 16 calls to the connection pool (i.e. '==================== CREATING NEW
> CONNECTION!') while I see only 8 (up to my maximum number of cores).
> The number of created task instead is correct (16).
>
> I hope this could help in understanding where the problem is!
>
> Best and thank in advance,
> Flavio
>
> On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <s.bortoli@gmail.com>
> wrote:
>
>> Hi Ufuk,
>>
>> here is our preliminary input formar implementation:
>> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>>
>> if you need a running project, I will have to create a test one cause I
>> cannot share the current configuration.
>>
>> thanks a lot in advance!
>>
>>
>>
>> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi <uce@apache.org>:
>>
>>> Do you have the code somewhere online? Maybe someone can have a quick
>>> look over it later. I'm pretty sure that is indeed a problem with the
>>> custom input format.
>>>
>>> – Ufuk
>>>
>>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <s.bortoli@gmail.com>
>>> wrote:
>>> > Perhaps there is a misunderstanding on my side over the parallelism and
>>> > split management given a data source.
>>> >
>>> > We started from the current JDBCInputFormat to make it multi-thread.
>>> Then,
>>> > given a space of keys, we create the splits based on a fetchsize set
>>> as a
>>> > parameter. In the open, we get a connection from the pool, and execute
>>> a
>>> > query using the split interval. This sets the 'resultSet', and then the
>>> > DatasourceTask iterates between reachedEnd, next and close. On close,
>>> the
>>> > connection is returned to the pool. We set parallelism to 32, and we
>>> would
>>> > expect 32 connection opened but the connections opened are just 8.
>>> >
>>> > We tried to make an example with the textinputformat, but being a
>>> > delimitedinpurformat, the open is called sequentially when statistics
>>> are
>>> > built, and then the processing is executed in parallel just after all
>>> the
>>> > open are executed. This is not feasible in our case, because there
>>> would be
>>> > millions of queries before the statistics are collected.
>>> >
>>> > Perhaps we are doing something wrong, still to figure out what. :-/
>>> >
>>> > thanks a lot for your help.
>>> >
>>> > saluti,
>>> > Stefano
>>> >
>>> >
>>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:
>>> >>
>>> >> That is exactly my point. I should have 32 threads running, but I have
>>> >> only 8. 32 Task are created, but only only 8 are run concurrently.
>>> Flavio
>>> >> and I will try to make a simple program to produce the problem. If we
>>> solve
>>> >> our issues on the way, we'll let you know.
>>> >>
>>> >> thanks a lot anyway.
>>> >>
>>> >> saluti,
>>> >> Stefano
>>> >>
>>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>>> >>>
>>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used
to run
>>> >>> futures and their callbacks. But as Ufuk said, each task will spawn
>>> it’s own
>>> >>> thread and if you set the parallelism to 32 then you should have
32
>>> threads
>>> >>> running.
>>> >>>
>>> >>>
>>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
>>> s.bortoli@gmail.com>
>>> >>> wrote:
>>> >>>>
>>> >>>> In fact, I don't use it. I just had to crawl back the runtime
>>> >>>> implementation to get to the point where parallelism was switching
>>> from 32
>>> >>>> to 8.
>>> >>>>
>>> >>>> saluti,
>>> >>>> Stefano
>>> >>>>
>>> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <till.rohrmann@gmail.com>:
>>> >>>>>
>>> >>>>> Hi,
>>> >>>>>
>>> >>>>> for what do you use the ExecutionContext? That should actually
be
>>> >>>>> something which you shouldn’t be concerned with since
it is only
>>> used
>>> >>>>> internally by the runtime.
>>> >>>>>
>>> >>>>> Cheers,
>>> >>>>> Till
>>> >>>>>
>>> >>>>>
>>> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <
>>> s.bortoli@gmail.com>
>>> >>>>> wrote:
>>> >>>>>>
>>> >>>>>> Well, in theory yes. Each task has a thread, but only
a number is
>>> run
>>> >>>>>> in parallel (the job of the scheduler).  Parallelism
is set in the
>>> >>>>>> environment. However, whereas the parallelism parameter
is set
>>> and read
>>> >>>>>> correctly, when it comes to actual starting of the threads,
the
>>> number is
>>> >>>>>> fix to 8. We run a debugger to get to the point where
the thread
>>> was
>>> >>>>>> started. As Flavio mentioned, the ExecutionContext has
the
>>> parallelims set
>>> >>>>>> to 8. We have a pool of connections to a RDBS and il
logs the
>>> creation of
>>> >>>>>> just 8 connections although parallelism is much higher.
>>> >>>>>>
>>> >>>>>> My question is whether this is a bug (or a feature)
of the
>>> >>>>>> LocalMiniCluster. :-) I am not scala expert, but I see
some
>>> variable
>>> >>>>>> assignment in setting up of the MiniCluster, involving
>>> parallelism and
>>> >>>>>> 'default values'. Default values in terms of parallelism
are
>>> based on the
>>> >>>>>> number of cores.
>>> >>>>>>
>>> >>>>>> thanks a lot for the support!
>>> >>>>>>
>>> >>>>>> saluti,
>>> >>>>>> Stefano
>>> >>>>>>
>>> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <uce@apache.org>:
>>> >>>>>>>
>>> >>>>>>> Hey Stefano,
>>> >>>>>>>
>>> >>>>>>> this should work by setting the parallelism on the
environment,
>>> e.g.
>>> >>>>>>>
>>> >>>>>>> env.setParallelism(32)
>>> >>>>>>>
>>> >>>>>>> Is this what you are doing?
>>> >>>>>>>
>>> >>>>>>> The task threads are not part of a pool, but each
submitted task
>>> >>>>>>> creates its own Thread.
>>> >>>>>>>
>>> >>>>>>> – Ufuk
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>> >>>>>>> <pompermaier@okkam.it> wrote:
>>> >>>>>>> > Any help here? I think that the problem is
that the JobManager
>>> >>>>>>> > creates the
>>> >>>>>>> > executionContext of the scheduler with
>>> >>>>>>> >
>>> >>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>>> >>>>>>> > ForkJoinPool())
>>> >>>>>>> >
>>> >>>>>>> > and thus the number of concurrently running
threads is limited
>>> to
>>> >>>>>>> > the number
>>> >>>>>>> > of cores (using the default constructor of
the ForkJoinPool).
>>> >>>>>>> > What do you think?
>>> >>>>>>> >
>>> >>>>>>> >
>>> >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>> >>>>>>> > <s.bortoli@gmail.com>
>>> >>>>>>> > wrote:
>>> >>>>>>> >>
>>> >>>>>>> >> Hi guys,
>>> >>>>>>> >>
>>> >>>>>>> >> I am trying to test a job that should run
a number of tasks to
>>> >>>>>>> >> read from a
>>> >>>>>>> >> RDBMS using an improved JDBC connector.
The connection and the
>>> >>>>>>> >> reading run
>>> >>>>>>> >> smoothly, but I cannot seem to be able
to move above the
>>> limit of
>>> >>>>>>> >> 8
>>> >>>>>>> >> concurrent threads running. 8 is of course
the number of
>>> cores of
>>> >>>>>>> >> my
>>> >>>>>>> >> machine.
>>> >>>>>>> >>
>>> >>>>>>> >> I have tried working around configurations
and settings, but
>>> the
>>> >>>>>>> >> Executor
>>> >>>>>>> >> within the ExecutionContext keeps on having
a parallelism of
>>> 8.
>>> >>>>>>> >> Although, of
>>> >>>>>>> >> course, the parallelism of the execution
environment is much
>>> >>>>>>> >> higher (in fact
>>> >>>>>>> >> I have many more tasks to be allocated).
>>> >>>>>>> >>
>>> >>>>>>> >> I feel it may be an issue of the LocalMiniCluster
>>> configuration
>>> >>>>>>> >> that may
>>> >>>>>>> >> just override/neglect my wish for higher
degree of
>>> parallelism. Is
>>> >>>>>>> >> there a
>>> >>>>>>> >> way for me to work around this issue?
>>> >>>>>>> >>
>>> >>>>>>> >> please let me know. Thanks a lot for you
help! :-)
>>> >>>>>>> >>
>>> >>>>>>> >> saluti,
>>> >>>>>>> >> Stefano
>>> >>>>>>> >
>>> >>>>>>> >
>>> >>>>>>> >
>>> >>>>>>
>>> >>>>>>
>>> >>>>>
>>> >>>>
>>> >>>
>>> >>
>>> >
>>>
>>
>>
>

Mime
View raw message