flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Camelia Elena Ciolac <came...@chalmers.se>
Subject RE: Questions for a better understanding of the internals of data exchange between tasks
Date Wed, 20 Jan 2016 06:45:19 GMT
Dear Ufuk,

Thank you very much for this detailed explanation, it helped me understand.
So, many many thanks!


From: Ufuk Celebi [uce@apache.org]
Sent: Tuesday, January 19, 2016 7:16 PM
To: dev@flink.apache.org
Subject: Re: Questions for a better understanding of the internals of data exchange between

On Tue, Jan 19, 2016 at 11:37 AM, Camelia Elena Ciolac <camelia@chalmers.se>

> Hello,
> I list some questions gathered while reading documentation on Flink's
> internals and I am grateful to receive your answers.
> 1) How is the JobManager involved in the communication between tasks
> running in task slots on TaskManagers?
> From [1] it appears to me that, as part of the control flow for data
> exchange, every time a task has a a result that is consumable, it notifies
> the JobManager
> This is repeated in [2] "When producing an intermediate result, the
> producing task is responsible to notify the JobManager about available
> data".
> However, the result's size might as well be 1, in the case of data
> streams, so I ask this question to understand better if for each data
> exchange the JobManager is involved.

The JobManager is involved when the result is consumable. Depending on the
result characteristic this is either the case when the first record/buffer
has been produced (pipelined results) or when all records/buffers have been
produced (blocking results). The notification can either kick of the
deployment of the consuming tasks or update them with more specific
information about the location of the to be consumed results. The
notification does not happen for each buffer that is transferred. The data
exchange happens from task manager to task manager via a different TCP

> 2) In case my understanding of the aforementioned control flow is correct,
> then from the following options, which is the data outcome that triggers a
> notification of the JobManager: a ResultPartition, a ResultSubpartition or
> a Buffer?

The ResultPartition having at least one (pipelined) or all (blocking)
buffers produced. The sub partitions contain all data for a parallel sub
task consumer (for example reducer sub task 1 out of a total of 4 reducer
sub tasks).

3) Idem, then are the Akka actors involved in this notification of data
> availability for consumption?

Yes, all distributed coordinations happens via Akka. The actual data
exchange happens via a custom TCP stack with Netty. As an example: the Akka
messages contain information about which result to request from which task
manager and the request and data transfer happens via TCP.

> 4) How is orchestrated this co-existence of receiver-initiated data
> transfers (the consumer requests the data partitions) with the push data
> transfers [2] between 2 tasks ?

The JobManager acts as the coordinator of the system and holds all required
information in the ExecutionGraph data structure. The execution graph is an
asynchronous state machine used for scheduling and tracking the progress of
deployed tasks. During deployment, the tasks either know where to request
data from or they get updates during runtime if the result location is not
known at deployment time.

> From [3] I retain the paragraph:
> "The JobManager also acts as the input split assigner for data sources. It
> is responsible for distributing the work across all TaskManager such that
> data locality is preserved where possible. In order to dynamically balance
> the load, the Tasks request a new input split after they have finished
> processing the old one. This request is realized by sending a
> RequestNextInputSplit to the JobManager. The JobManager responds with a
> NextInputSplit message. If there are no more input splits, then the input
> split contained in the message is null.
> The Tasks are deployed lazily to the TaskManagers. This means that tasks
> which consume data are only deployed after one of its producers has
> finished producing some data. Once the producer has done so, it sends a
> ScheduleOrUpdateConsumers message to the JobManager. This messages says
> that the consumer can now read the newly produced data. If the consuming
> task is not yet running, it will be deployed to a TaskManager."
> 5) Can you please summarize how this data exchange occurs in Flink, based
> on an example?

WordCount with parallelism 2:

[ Source -> Map 0 ] |-> ResultPartition 0 with 2 sub partitions <--+--- [
Reduce 0 -> Sink 0 ]
[ Source -> Map 1 ] |-> ResultPartition 1 with 2 sub partitions <--+--- [
Reduce 1 -> Sink 1 ]

Assume that Result 0 is pipelined. At runtime, result 0 is made up from of
two ResultPartitions (for each parallel Source->Map pipeline) with two sub
partitions (for each consuming Reduce->Sink pipeline). The JobManager
schedules the source tasks (which are chained with the mappers). As soon as
the Source->Map pipeline produces the first data, the job manager will
receive an Akka message to deploy the consuming tasks (it will receive
multiple of these, of which one will be the first). Because Result 0 is
consumed in an all-to-all fashion, this triggers the deployment of both
reducers. At this point the JobManager tries to schedule the reducers,
which then request their respective sub partitions from both
ResultPartition 0 and 1, e.g. [Reduce 0->Sink 0] request sub partition 0 of
ResultPartition 0 and 1 and [Reduce 1->Sink 1] requests subpartition 1 of
ResultPartition 0 and 1.

This is the general flow of things (missing some detail regarding runtime
updates of the consumers). For blocking results, the notifications happen
when all parallel Source->Map pipelines have produced their data.

> Thank you!

Does this help? Of course, feel free to ask further questions.

– Ufuk

View raw message