airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gerard Toonstra (JIRA)" <>
Subject [jira] [Commented] (AIRFLOW-72) Implement proper capacity scheduler
Date Sun, 06 Nov 2016 22:12:58 GMT


Gerard Toonstra commented on AIRFLOW-72:

-- in depth analysis --

The main reason why there's oversubscription is because the 'state' in which a pool is at
any moment in time is resolved by looking at task instance state and counting the number of
slots configured in the pool. I.e. at the time of adding a task into the pool for execution,
the pool slot isn't immediately seen as reserved, it will only be seen "reserved" when the
task instance is actually RUNNING. Inbetween sending a task instance to the executor and the
task instance running on a worker, there's a time window of opportunity for race conditions
to occur. Depending on how many workers are available and how quickly tasks are getting picked
up, this may have some significant impact.

The scheduler will not normally violate adding tasks to a pool beyond its capacity within
the same loop, unless task instances only start RUNNING beyond the "scheduler_heartbeat_sec"
parameter setting.

In the meantime, the DagFileProcessor is running and this can add more task instances into
the SCHEDULED state. At the next heartbeat, the scheduler could send these task instances
to the executor and violate the pool slot setting, as long as the workers have not started
to run the task instances and update their status to RUNNING. So there's a complex play between
scheduler_heartbeat_sec, parallelism, celeryd_concurrency and worker congestion which influences
that. The extra dependency check on pools at the time of starting a task instance helps to
mitigate this issue, but doesn't make it 100% proof against it. This is because task A and
B starting at the same time may not see each other update into "RUNNING" at the time of running
the SELECT. I have still observed small violations of the pool resource limit, the actual
violation depends on the pool size and how many tasks get queued into the executor.

-- potential solution --

AIRFLOW-41 also discusses this issue and makes mention of explicitly allocating pool slots
to task instances. 

A PoolReservation db class could be made when a ti is sent to the executor. The task instance
would then look for the pool reservation (in a dep) if assigned to a pool and only start running
when it found one. The scheduler would count the PoolReservations for a pool to decide on
the number of open slots. If only the scheduler hands out these reservations, this would make
it work 100%. The downside is that this database class has to be maintained together with
state changes to the TI, which is a lot of hassle and could lead to a whole lot of extra complexity.
It would also give rise to questions as "Should a TI In RETRY state continue to have the reservation,
or is it reassigned when rescheduled?". 

What's not yet correctly done in the current situation is that counting of used slots should
be done on both "RUNNING" and "QUEUED", at the moment only RUNNING is checked. With that strategy,
an in-memory PoolHandler can recover at scheduler startup by reading that state from the database
and bootstrap, which should be done before execute_task_instances is run for the first time.
It can frequently update the pool state to synchronize what's really happening, as long as
this is done in the same thread as the job that runs "execute_task_instances". 

When testing a potential solution, I'd recommend to start by temporarily disabling PoolHasSpaceDep
and to add a 2*3 x scheduler_heartbeat_sec delay into the executor before tasks are actually
run to amplify the race condition.

> Implement proper capacity scheduler
> -----------------------------------
>                 Key: AIRFLOW-72
>                 URL:
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: pools, scheduler
>    Affects Versions: Airflow 1.7.1
>            Reporter: Bolke de Bruin
>              Labels: pool, queue, scheduler
>             Fix For: Airflow 2.0
> The scheduler is supposed to maintain queues and pools according to a "capacity" model.
However it is currently not properly implemented as therefore issues as being able to oversubscribe
to pools exist, race conditions for queuing/dequeuing exist and probably others.
> This Jira Epic is to track all related issues to pooling/queuing and the (tbd) roadmap
to a proper capacity scheduler.
> Why queuing / scheduling broken:
> Locking is not properly implemented and cannot be as a check for slot availability is
spread throughout the scheduler, taskinstance and executor. This makes obtaining a slot non-atomic
and results in over subscribing. In addition it leads to race conditions as having two tasks
being picked from the queue at the same time as the scheduler determines that a queued task
still needs to be send to the executor, while in an earlier run this already happened.
> In order to fix this Pool handling needs to be centralized (code wise) and work with
a mutex (with_for_update()) on the database records. The scheduler/taskinstance can then do
something like:
> slot = Pool.obtain_slot(pool_id)
> Pool.release_slot(slot)

This message was sent by Atlassian JIRA

View raw message