airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vijay Bhat <vijaysb...@gmail.com>
Subject Travis random failure investigation
Date Fri, 21 Oct 2016 22:25:27 GMT
Hi all,

*TL;DR* I have some news about the random Travis failures we've been
seeing. I found the root cause to be a subtle race condition in the
backfill execution loop that creates deadlocks every now and then. I'm
working on a PR right now that addresses the race condition, and would
appreciate a review from the committers once it's ready.

*Analysis:*
The common pattern I was seeing in the random build failures was a deadlock
in an arbitrary backfill job. Example below:

*========================================================*
*ERROR: test_backfill_multi_dates (tests.BackfillJobTest)*
*----------------------------------------------------------------------*
*Traceback (most recent call last):*
*  File "/home/travis/build/apache/incubator-airflow/tests/jobs.py", line
100, in test_backfill_multi_dates*
*    job.run()*
*  File "/home/travis/build/apache/incubator-airflow/airflow/jobs.py", line
194, in run*
*    self._execute()*
*  File "/home/travis/build/apache/incubator-airflow/airflow/jobs.py", line
1894, in _execute*
*    raise AirflowException(err)*
*nose.proxy.AirflowException:
---------------------------------------------------*
*BackfillJob is deadlocked. These tasks were unable to run:*
*{<TaskInstance: example_bash_operator.run_after_loop 2016-01-02 00:00:00
[None]>, <TaskInstance: example_bash_operator.run_this_last 2016-01-02
00:00:00 [None]>}*

After digging into the backfill execution code and adding lots of logging,
I found a race condition vulnerability in the main backfill execution loop
(BackfillJob._execute) for a DAG run:

*           # Triggering what is ready to get triggered*
*            while tasks_to_run and not deadlocked:*
*                not_ready.clear()*

*                for key, ti in list(tasks_to_run.items()):
    ti.refresh_from_db(session=session, lock_for_update=True)*
*             ...*
*             ...*
*            # update dag run state*
*            run.update_state(session=session)*
*            if run.dag.is_paused:*
*                models.DagStat.clean_dirty([run.dag_id], session=session)*

The problem is that the state of all task instances for a DAG run is not
read atomically in the loop, but can be refreshed piecemeal (highlighted in
bold above) as the loop executes. In a multiprocessing scenario (like
LocalExecutor), this leaves the door open to spuriously detecting a
deadlock state and failing.

Here's an example sequence of events that can cause this failure. Let's say
we have a DAG with tasks A and B, with B dependent on A (A -> B) and A has
been picked up by a worker (but not completed), which means B is not ready
to run. The backfill / local executor process is actively running the .

1. Let tasks_to_run be read as [B, A] in BackfillJob._execute

2. In the while loop, B is inspected first, and it's correctly identified
as not runnable (since A hasn't succeeded yet). B is added to not_ready.
Now, not_ready = [B]

3. The backfill / local executor process gets interrupted and control is
given to the worker process, which then runs task A and marks it as
complete in the DB (in the TaskInstance run method).

4. Control is given back to the backfill / local executor process that goes
on to inspect task A. It calls ti.refresh_from_db, and finds A is complete,
so it pops it off the tasks_to_run list. Now, tasks_to_run = [B]

5. The following code segment in the loop incorrectly marks the DAG run as
deadlocked and the backfill job is marked failed:
*                # If the set of tasks that aren't ready ever equals the
set of*
*                # tasks to run, then the backfill is deadlocked*
*                if not_ready and not_ready == set(tasks_to_run):*
*                    deadlocked.update(tasks_to_run.values())*
*                    tasks_to_run.clear()*


The additional logging that I added confirmed this theory when I pushed it
to Travis. I've highlighted the important sections in bold:

*[2016-10-21 04:01:19,879] {jobs.py:1689} DEBUG - Clearing out not_ready
list*
*[2016-10-21 04:01:19,882] {jobs.py:1701} DEBUG - Task instance to run
<TaskInstance: example_bash_operator.run_after_loop 2016-01-02 00:00:00
[None]> state None*
*[2016-10-21 04:01:19,882] {models.py:1077} DEBUG - <TaskInstance:
example_bash_operator.run_after_loop 2016-01-02 00:00:00 [None]> dependency
'Not In Retry Period' PASSED: The task instance was not marked for
retrying.*
*[2016-10-21 04:01:19,882] {models.py:1077} DEBUG - <TaskInstance:
example_bash_operator.run_after_loop 2016-01-02 00:00:00 [None]> dependency
'Previous Dagrun State' PASSED: The task did not have depends_on_past set.*
*[2016-10-21 04:01:19,882] {models.py:1077} DEBUG - <TaskInstance:
example_bash_operator.run_after_loop 2016-01-02 00:00:00 [None]> dependency
'Task Instance State' PASSED: Task state None was valid.*

*[2016-10-21 04:01:19,892] {models.py:1056} WARNING - Dependencies not met
for <TaskInstance: example_bash_operator.run_after_loop 2016-01-02 00:00:00
[None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule
'all_success' requires all upstream tasks to have succeeded, but found 1
non-success(es).successes=2, skipped=0, failed=0,upstream_failed=0,
done=2,flag_upstream_failed=True, upstream_task_ids=['runme_0', 'runme_1',
'runme_2']*

*[2016-10-21 04:01:19,892] {jobs.py:1745} DEBUG - Adding <TaskInstance:
example_bash_operator.run_after_loop 2016-01-02 00:00:00 [None]> to
not_ready*

*[2016-10-21 04:01:19,896] {jobs.py:1701} DEBUG - Task instance to run
<TaskInstance: example_bash_operator.run_this_last 2016-01-02 00:00:00
[None]> state None*
*[2016-10-21 04:01:19,896] {models.py:1077} DEBUG - <TaskInstance:
example_bash_operator.run_this_last 2016-01-02 00:00:00 [None]> dependency
'Not In Retry Period' PASSED: The task instance was not marked for
retrying.*
*[2016-10-21 04:01:19,896] {models.py:1077} DEBUG - <TaskInstance:
example_bash_operator.run_this_last 2016-01-02 00:00:00 [None]> dependency
'Previous Dagrun State' PASSED: The task did not have depends_on_past set.*
*[2016-10-21 04:01:19,896] {models.py:1077} DEBUG - <TaskInstance:
example_bash_operator.run_this_last 2016-01-02 00:00:00 [None]> dependency
'Task Instance State' PASSED: Task state None was valid.*
*[2016-10-21 04:01:19,925] {models.py:1056} WARNING - Dependencies not met
for <TaskInstance: example_bash_operator.run_this_last 2016-01-02 00:00:00
[None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule
'all_success' requires all upstream tasks to have succeeded, but found 1
non-success(es).successes=1, skipped=0, failed=0,upstream_failed=0,
done=1,flag_upstream_failed=True, upstream_task_ids=['run_after_loop',
'also_run_this']*
*[2016-10-21 04:01:19,925] {jobs.py:1745} DEBUG - Adding <TaskInstance:
example_bash_operator.run_this_last 2016-01-02 00:00:00 [None]> to
not_ready*

*[2016-10-21 04:01:19,929] {jobs.py:1701} DEBUG - Task instance to run
<TaskInstance: example_bash_operator.runme_2 2016-01-02 00:00:00 [success]>
state success*
*[2016-10-21 04:01:19,929] {jobs.py:1707} DEBUG - Task instance
<TaskInstance: example_bash_operator.runme_2 2016-01-02 00:00:00 [success]>
succeeded. Don't rerun.*
*[2016-10-21 04:01:19,947] {jobs.py:1701} DEBUG - Task instance to run
<TaskInstance: example_bash_operator.runme_0 2016-01-02 00:00:00 [success]>
state success*
*[2016-10-21 04:01:19,947] {jobs.py:1707} DEBUG - Task instance
<TaskInstance: example_bash_operator.runme_0 2016-01-02 00:00:00 [success]>
succeeded. Don't rerun.*
*[2016-10-21 04:01:19,951] {jobs.py:1701} DEBUG - Task instance to run
<TaskInstance: example_bash_operator.runme_1 2016-01-02 00:00:00 [success]>
state success*
*[2016-10-21 04:01:19,951] {jobs.py:1707} DEBUG - Task instance
<TaskInstance: example_bash_operator.runme_1 2016-01-02 00:00:00 [success]>
succeeded. Don't rerun.*
*[2016-10-21 04:01:20,879] {jobs.py:180} DEBUG - [heart] Boom*

*...*
*...*

*======================================================ERROR:
test_backfill_multi_dates
(tests.BackfillJobTest)----------------------------------------------------------------------Traceback
(most recent call last):  File
"/home/travis/build/apache/incubator-airflow/tests/jobs.py", line 100, in
test_backfill_multi_dates    job.run()  File
"/home/travis/build/apache/incubator-airflow/airflow/jobs.py", line 194, in
run    self._execute()  File
"/home/travis/build/apache/incubator-airflow/airflow/jobs.py", line 1894,
in _execute    raise AirflowException(err)nose.proxy.AirflowException:
---------------------------------------------------BackfillJob is
deadlocked. These tasks were unable to run:{<TaskInstance:
example_bash_operator.run_after_loop 2016-01-02 00:00:00 [None]>,
<TaskInstance: example_bash_operator.run_this_last 2016-01-02 00:00:00
[None]>}*

*How to fix:*
The main reason for the race condition is that we are not synchronizing
access to the task instances in a DAG run.

There are 3+n actors in the backfill system:

   - Backfill loop
   - LocalExecutor object
   - Metastore DB
   - n LocalWorker processes

The backfill loop and local executor run in the same process, so we don't
have to worry about synchronization between them. But we need to
synchronize access between the other actors. The channels of communication
in this context are:

   - Backfill loop <-> LocalExecutor = event_buffer
   - Backfill loop <-> Metastore = SQL Alchemy ORM
   - Metastore <-> LocalWorker = SQL Alchemy ORM

Which means the backfill loop has two versions of the task instance state,
one from the LocalExecutor event buffer (which gets updated when the worker
completes a task) and another from the metastore (which the worker also
writes to).

If we consider the metastore to be the source of truth, we can synchronize
access by reading the state of *all* task instances for the DAG run in a
single query before the "*for key, ti in list(tasks_to_run.items())*" loop
and removing individual task instance refreshes inside the loop.

I've implemented this fix and am seeing positive results so far. I'm
working through all the test cases before I push the PR out for review.

Folks familiar with the backfill code, I appreciate hearing your thoughts.

-Vijay

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message