flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Till Rohrmann (Jira)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly
Date Wed, 07 Jul 2021 10:24:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17376468#comment-17376468

Till Rohrmann commented on FLINK-23190:

{{cluster.evenly-spread-out-slots}} should only work for non-active deployments (e.g. standalone,
docker, etc.).

I might not fully understand how the spreading out exactly works. Maybe you can explain it
a bit more in detail. However, note that the {{JobMaster}} no longer sends {{AllocationIDs}}
to the {{ResourceManager}} since Flink 1.13.

The declarative resource management is the default since Flink 1.13. The {{AdaptiveScheduler}}
is also part of Flink since version {{1.13.0}}. These two features won't fully solve the load
balance feature because Flink has the underlying assumption that all slots are equal. The
fact that spreading tasks across multiple {{TaskExecutors}} might be more efficient is because
of the poor resource isolation the JVM gives us.

> Make task-slot allocation much more evenly
> ------------------------------------------
>                 Key: FLINK-23190
>                 URL: https://issues.apache.org/jira/browse/FLINK-23190
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.12.3
>            Reporter: loyi
>            Priority: Major
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which are registered
at the time of scheduling, but our jobs are all runing on active yarn mode, the job with
smaller source parallelism offen cause load-balance issues. 
> For this job:
> {code:java}
> //  -ys 4     means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
> Suggestions:
> When TaskManger start register slots to slotManager , current processing logic will choose 
the first pendingSlot which meet its resource requirements.  The "random" strategy usually
causes uneven task allocation when source-operator's parallelism is significantly below
process-operator's.   A simple feasible idea  is  {color:#de350b}partition{color} the
current  "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let AllocationID
bring the detail)  , then allocate the slots proportionally to each JobVertexGroup.
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
> Every taskmanager will provide 4 slots one time, and each group will get 1 slot according
their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
> I have implement a [concept code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
 based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully evenly{color} task
allocation , and works well on my workload .  Are there other point that have not been considered
or  does it conflict with future plans?      Sorry for my poor english.

This message was sent by Atlassian Jira

View raw message