flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Question about Scheduling of Batch Jobs
Date Wed, 04 Jan 2017 13:15:52 GMT
Hi Konstantin,

the DataSet API tries to execute all operators as soon as possible.

I assume that in your case, Flink does not do this because it tries to
avoid a deadlock.
A dataflow which replicates data from the same source and joins it again
might get deadlocked because all pipelines need to make progress in order
to finish the source.

Think of a simple example like this:

           /-- Map1 --\
Src --<                  >-Join
           \-- Map2 --/

If the join is executed as a hash join, one input (Map1) is used to build a
hash table. Only once the hash table is built, the other input (Map2) can
be consumed.
If both Map operators would run at the same time, Map2 would stall at some
point because it cannot emit anymore data due to the backpressure of the
not-yet-opened probe input of the hash join.
Once Map2 stalls, the Source would stall and Map1 could not continue to
finish the build side. At this point we have a deadlock.

Flink detects these situations and adds an artificial pipeline breaker in
the dataflow to prevent deadlocks. Due to the pipeline breaker, the build
side is completed before the probe side input is processed.

This also answers the question, which operator is executed first: the
operator on the build side of the first join. Hence the join strategy of
the optimizer (BUILD_FIRST, BUILD_SECONS) decides.
You can also give a manual JoinHint to control that. If you give a
SORT_MERGE hint, all three operators should run concurrently because both
join input will be concurrently consumed for sorting.

Best, Fabian

2017-01-04 13:30 GMT+01:00 Konstantin Knauf <konstantin.knauf@tngtech.com>:

> Hi everyone,
> I have a basic question regarding scheduling of batch programs. Let's
> take the following graph:
>           -> Group Combine -> ...
>         /
> Source ----> Group Combine -> ...
>         \
>           -> Map -> ...
> So, a source and followed by three operators with ship strategy
> "Forward" and exchange mode "pipelined".
> The three flows are later joined again, so that this results in a single
> job.
> When the job is started, first, only one of the operators immediately
> receive the input read by the source and can therefore run concurrently
> with the source. Once the source is finished, the other two operators
> are scheduled.
> Two questions about this:
> 1) Why doesn't the source forward the records to all three operators
> while still running?
> 2) How does the jobmanager decide, which of the three operators
> receivese the pipelined data first?
> Cheers and Thanks,
> Konstantin
> --
> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

View raw message