airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Vijay Bhat (JIRA)" <>
Subject [jira] [Created] (AIRFLOW-585) Fix race condition in backfill execution loop
Date Fri, 21 Oct 2016 22:55:58 GMT
Vijay Bhat created AIRFLOW-585:

             Summary: Fix race condition in backfill execution loop
                 Key: AIRFLOW-585
             Project: Apache Airflow
          Issue Type: Bug
          Components: executor, travis
         Environment: LocalExecutor, CeleryExecutor
            Reporter: Vijay Bhat
            Assignee: Vijay Bhat

I found this bug while investigating random Travis build failures. The root cause is a subtle
race condition in the backfill execution loop that creates deadlocks every now and then.

The common pattern I was seeing in the random build failures was a deadlock in an arbitrary
backfill job. Example below:

ERROR: test_backfill_multi_dates (tests.BackfillJobTest)
Traceback (most recent call last):
  File "/home/travis/build/apache/incubator-airflow/tests/", line 100, in test_backfill_multi_dates
  File "/home/travis/build/apache/incubator-airflow/airflow/", line 194, in run
  File "/home/travis/build/apache/incubator-airflow/airflow/", line 1894, in _execute
    raise AirflowException(err)
nose.proxy.AirflowException: ---------------------------------------------------
BackfillJob is deadlocked. These tasks were unable to run:
{<TaskInstance: example_bash_operator.run_after_loop 2016-01-02 00:00:00 [None]>, <TaskInstance:
example_bash_operator.run_this_last 2016-01-02 00:00:00 [None]>}

After digging into the backfill execution code and adding lots of logging, I found a race
condition vulnerability in the main backfill execution loop (BackfillJob._execute) for a DAG

           # Triggering what is ready to get triggered
            while tasks_to_run and not deadlocked:

                for key, ti in list(tasks_to_run.items()):
                    ti.refresh_from_db(session=session, lock_for_update=True)
            # update dag run state
            if run.dag.is_paused:
                models.DagStat.clean_dirty([run.dag_id], session=session)

The problem is that the state of all task instances for a DAG run is not read atomically in
the loop, but can be refreshed piecemeal (highlighted in bold above) as the loop executes.
In a multiprocessing scenario (like LocalExecutor), this leaves the door open to spuriously
detecting a deadlock state and failing.

Here's an example sequence of events that can cause this failure. Let's say we have a DAG
with tasks A and B, with B dependent on A (A -> B) and A has been picked up by a worker
(but not completed), which means B is not ready to run. The backfill / local executor process
is actively running the .

1. Let tasks_to_run be read as [B, A] in BackfillJob._execute

2. In the while loop, B is inspected first, and it's correctly identified as not runnable
(since A hasn't succeeded yet). B is added to not_ready. Now, not_ready = [B]

3. The backfill / local executor process gets interrupted and control is given to the worker
process, which then runs task A and marks it as complete in the DB (in the TaskInstance run

4. Control is given back to the backfill / local executor process that goes on to inspect
task A. It calls ti.refresh_from_db, and finds A is complete, so it pops it off the tasks_to_run
list. Now, tasks_to_run = [B]

5. The following code segment in the loop incorrectly marks the DAG run as deadlocked and
the backfill job is marked failed: 
                # If the set of tasks that aren't ready ever equals the set of
                # tasks to run, then the backfill is deadlocked
                if not_ready and not_ready == set(tasks_to_run):

How to fix:
The main reason for the race condition is that we are not synchronizing access to the task
instances in a DAG run. 

There are 3+n actors in the backfill system:
Backfill loop
LocalExecutor object
Metastore DB
n LocalWorker processes
The backfill loop and local executor run in the same process, so we don't have to worry about
synchronization between them. But we need to synchronize access between the other actors.
The channels of communication in this context are:
Backfill loop <-> LocalExecutor = event_buffer
Backfill loop <-> Metastore = SQL Alchemy ORM
Metastore <-> LocalWorker = SQL Alchemy ORM
Which means the backfill loop has two versions of the task instance state, one from the LocalExecutor
event buffer (which gets updated when the worker completes a task) and another from the metastore
(which the worker also writes to).

If we consider the metastore to be the source of truth, we can synchronize access by reading
the state of all task instances for the DAG run in a single query before the "for key, ti
in list(tasks_to_run.items())" loop and removing individual task instance refreshes inside
the loop.

This message was sent by Atlassian JIRA

View raw message