airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luke Bodeen (Jira)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-1510) Scheduler: priority_weight sorting not applied across multiple DAGs
Date Wed, 08 Jan 2020 18:33:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010905#comment-17010905
] 

Luke Bodeen commented on AIRFLOW-1510:
--------------------------------------

[~ash] mentioned de-coupling dag parsing from scheduling but that is a much larger effort,
one which might already be in flight?

> Scheduler: priority_weight sorting not applied across multiple DAGs
> -------------------------------------------------------------------
>
>                 Key: AIRFLOW-1510
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1510
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.8.1
>         Environment: Ubuntu, CeleryRunner
>            Reporter: Joseph Harris
>            Priority: Major
>
> h3. Issue
> When there are multiple available tasks across many DAGs, the order in which those tasks
are queued does not respect the priority_weighting order across all available tasks. The run
order is instead dependent on how quickly the Scheduler loop reaches the DAG, when the number
of DAGs is greater than the scheduler 'max_threads' variable.
> This is particularly problematic when there are long-running tasks competing over a limited
slots in a pool.
> With over 80 DAGs in operations, increasing max_threads to this number doesn't seem like
a practical solution.
> h3. What should be done
> * The docs should be updated to be less misleading about how priority_weight is likely
to behave: https://airflow.incubator.apache.org/concepts.html#pools
> * Potential implementation improvements on the scheduler: force the ProcessorManager
to wait for all jobs to be processed (slow but reliable) - or make _execute_task_instances()
look at tasks from all DAGs (faster but less reliable).
> h3. Example
> For instance, with 4 tasks:
> || DAG ||Task||Priority||Pool||
> || A || 1 |20|pool|
> || B || 2 |1|pool|
> || C || 3 |1|pool|
> || D || 4 |100|pool|
> The scheduler would look at DAGs A & B first, and send the tasks in order (1, 2).
Then the scheduler would look at 3 & 4 and send these in order (4, 3) if there are enough
pool slots available.
> h3. Current Implementation Detail
> The SchedulerJob code is a bit complex, but the sequence of events in the scheduler loop
looks like this:
> * The [DagFileProcessorManager|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/utils/dag_processing.py#L298]
loops across all discovered DAG files, and launches Processor threads (limited by max_threads).
Each processor reads in a single DAG, and checks whether any tasks in the DAG have the dependencies
met. If dependencies are met, the task is set to state='scheduled'.
> * [DagFileProcessorManager.heartbeat()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1409]
returns a list of DAGs returned by the Processor threads during its last cycle. When max_threads
= 2, this list will contain a maximum of 2 DAGs.
> * These DAGs are [passed to SchedulerJob._execute_task_instances()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1440]
> * An [ORM query|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L992]
selects tasks where state='scheduled' and the task is in the DAGs returned by the last heartbeat()
loop. *Only those tasks are [sorted and passed to the Celery queue|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1035]*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message