airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vijay Bhat <vijaysb...@gmail.com>
Subject Re: Travis random failure investigation
Date Sat, 22 Oct 2016 20:32:05 GMT
Good point, Bolke.

The locking logic that prevents a race condition between two workers from
running the same task instance should be in the run method of TaskInstance.
It's mostly there (it uses refresh_from_db with locked update), but doesn't
cover the scenario you brought up. The fix would be to make sure the task
instance is not already in a running state before starting its execution:

>>>>> check that the task instance not already running, return if it is

        if not test_mode:
            session.add(Log(State.RUNNING, self))
        self.state = State.RUNNING
        self.end_date = None
        if not test_mode:
            session.merge(self)
        session.commit()

Since we'll do the check before calling commit(), we know that the current
worker is the only one able to execute this code segment for that
particular task instance.

I'll add the change and update the PR. I also found a code path that would
leave the refresh_from_db call uncommitted, which opens the door for a task
instance getting blocked indefinitely. I'll fix that as well.

On Sat, Oct 22, 2016 at 5:43 AM, Bolke de Bruin <bdbruin@gmail.com> wrote:

>
> Hi Vijay
>
> Great stuff! This has been bugging us for quite some time. From a first
> glance you analysis seems correct.
>
> However for the solution I'm wondering: neither the dag run or its task
> instances are now locked in the db when executing. At the time of state
> change the taskinstance needs to be locked in order to prevent a second
> race condition. This is what the refresh from db was for.
>
> What happens if I startup a second scheduler / do a backfill by hand for
> the same dag_run at the same time?
>
> - Bolke
>
>
>
>
>
> Sent from my iPhone
>
> > On 22 Oct 2016, at 00:25, Vijay Bhat <vijaysbhat@gmail.com> wrote:
> >
> > Hi all,
> >
> > *TL;DR* I have some news about the random Travis failures we've been
> > seeing. I found the root cause to be a subtle race condition in the
> > backfill execution loop that creates deadlocks every now and then. I'm
> > working on a PR right now that addresses the race condition, and would
> > appreciate a review from the committers once it's ready.
> >
> > *Analysis:*
> > The common pattern I was seeing in the random build failures was a
> deadlock
> > in an arbitrary backfill job. Example below:
> >
> > *========================================================*
> > *ERROR: test_backfill_multi_dates (tests.BackfillJobTest)*
> > *----------------------------------------------------------------------*
> > *Traceback (most recent call last):*
> > *  File "/home/travis/build/apache/incubator-airflow/tests/jobs.py",
> line
> > 100, in test_backfill_multi_dates*
> > *    job.run()*
> > *  File "/home/travis/build/apache/incubator-airflow/airflow/jobs.py",
> line
> > 194, in run*
> > *    self._execute()*
> > *  File "/home/travis/build/apache/incubator-airflow/airflow/jobs.py",
> line
> > 1894, in _execute*
> > *    raise AirflowException(err)*
> > *nose.proxy.AirflowException:
> > ---------------------------------------------------*
> > *BackfillJob is deadlocked. These tasks were unable to run:*
> > *{<TaskInstance: example_bash_operator.run_after_loop 2016-01-02
> 00:00:00
> > [None]>, <TaskInstance: example_bash_operator.run_this_last 2016-01-02
> > 00:00:00 [None]>}*
> >
> > After digging into the backfill execution code and adding lots of
> logging,
> > I found a race condition vulnerability in the main backfill execution
> loop
> > (BackfillJob._execute) for a DAG run:
> >
> > *           # Triggering what is ready to get triggered*
> > *            while tasks_to_run and not deadlocked:*
> > *                not_ready.clear()*
> >
> > *                for key, ti in list(tasks_to_run.items()):
> >    ti.refresh_from_db(session=session, lock_for_update=True)*
> > *             ...*
> > *             ...*
> > *            # update dag run state*
> > *            run.update_state(session=session)*
> > *            if run.dag.is_paused:*
> > *                models.DagStat.clean_dirty([run.dag_id],
> session=session)*
> >
> > The problem is that the state of all task instances for a DAG run is not
> > read atomically in the loop, but can be refreshed piecemeal (highlighted
> in
> > bold above) as the loop executes. In a multiprocessing scenario (like
> > LocalExecutor), this leaves the door open to spuriously detecting a
> > deadlock state and failing.
> >
> > Here's an example sequence of events that can cause this failure. Let's
> say
> > we have a DAG with tasks A and B, with B dependent on A (A -> B) and A
> has
> > been picked up by a worker (but not completed), which means B is not
> ready
> > to run. The backfill / local executor process is actively running the .
> >
> > 1. Let tasks_to_run be read as [B, A] in BackfillJob._execute
> >
> > 2. In the while loop, B is inspected first, and it's correctly identified
> > as not runnable (since A hasn't succeeded yet). B is added to not_ready.
> > Now, not_ready = [B]
> >
> > 3. The backfill / local executor process gets interrupted and control is
> > given to the worker process, which then runs task A and marks it as
> > complete in the DB (in the TaskInstance run method).
> >
> > 4. Control is given back to the backfill / local executor process that
> goes
> > on to inspect task A. It calls ti.refresh_from_db, and finds A is
> complete,
> > so it pops it off the tasks_to_run list. Now, tasks_to_run = [B]
> >
> > 5. The following code segment in the loop incorrectly marks the DAG run
> as
> > deadlocked and the backfill job is marked failed:
> > *                # If the set of tasks that aren't ready ever equals the
> > set of*
> > *                # tasks to run, then the backfill is deadlocked*
> > *                if not_ready and not_ready == set(tasks_to_run):*
> > *                    deadlocked.update(tasks_to_run.values())*
> > *                    tasks_to_run.clear()*
> >
> >
> > The additional logging that I added confirmed this theory when I pushed
> it
> > to Travis. I've highlighted the important sections in bold:
> >
> > *[2016-10-21 04:01:19,879] {jobs.py:1689} DEBUG - Clearing out not_ready
> > list*
> > *[2016-10-21 04:01:19,882] {jobs.py:1701} DEBUG - Task instance to run
> > <TaskInstance: example_bash_operator.run_after_loop 2016-01-02 00:00:00
> > [None]> state None*
> > *[2016-10-21 04:01:19,882] {models.py:1077} DEBUG - <TaskInstance:
> > example_bash_operator.run_after_loop 2016-01-02 00:00:00 [None]>
> dependency
> > 'Not In Retry Period' PASSED: The task instance was not marked for
> > retrying.*
> > *[2016-10-21 04:01:19,882] {models.py:1077} DEBUG - <TaskInstance:
> > example_bash_operator.run_after_loop 2016-01-02 00:00:00 [None]>
> dependency
> > 'Previous Dagrun State' PASSED: The task did not have depends_on_past
> set.*
> > *[2016-10-21 04:01:19,882] {models.py:1077} DEBUG - <TaskInstance:
> > example_bash_operator.run_after_loop 2016-01-02 00:00:00 [None]>
> dependency
> > 'Task Instance State' PASSED: Task state None was valid.*
> >
> > *[2016-10-21 04:01:19,892] {models.py:1056} WARNING - Dependencies not
> met
> > for <TaskInstance: example_bash_operator.run_after_loop 2016-01-02
> 00:00:00
> > [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule
> > 'all_success' requires all upstream tasks to have succeeded, but found 1
> > non-success(es).successes=2, skipped=0, failed=0,upstream_failed=0,
> > done=2,flag_upstream_failed=True, upstream_task_ids=['runme_0',
> 'runme_1',
> > 'runme_2']*
> >
> > *[2016-10-21 04:01:19,892] {jobs.py:1745} DEBUG - Adding <TaskInstance:
> > example_bash_operator.run_after_loop 2016-01-02 00:00:00 [None]> to
> > not_ready*
> >
> > *[2016-10-21 04:01:19,896] {jobs.py:1701} DEBUG - Task instance to run
> > <TaskInstance: example_bash_operator.run_this_last 2016-01-02 00:00:00
> > [None]> state None*
> > *[2016-10-21 04:01:19,896] {models.py:1077} DEBUG - <TaskInstance:
> > example_bash_operator.run_this_last 2016-01-02 00:00:00 [None]>
> dependency
> > 'Not In Retry Period' PASSED: The task instance was not marked for
> > retrying.*
> > *[2016-10-21 04:01:19,896] {models.py:1077} DEBUG - <TaskInstance:
> > example_bash_operator.run_this_last 2016-01-02 00:00:00 [None]>
> dependency
> > 'Previous Dagrun State' PASSED: The task did not have depends_on_past
> set.*
> > *[2016-10-21 04:01:19,896] {models.py:1077} DEBUG - <TaskInstance:
> > example_bash_operator.run_this_last 2016-01-02 00:00:00 [None]>
> dependency
> > 'Task Instance State' PASSED: Task state None was valid.*
> > *[2016-10-21 04:01:19,925] {models.py:1056} WARNING - Dependencies not
> met
> > for <TaskInstance: example_bash_operator.run_this_last 2016-01-02
> 00:00:00
> > [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule
> > 'all_success' requires all upstream tasks to have succeeded, but found 1
> > non-success(es).successes=1, skipped=0, failed=0,upstream_failed=0,
> > done=1,flag_upstream_failed=True, upstream_task_ids=['run_after_loop',
> > 'also_run_this']*
> > *[2016-10-21 04:01:19,925] {jobs.py:1745} DEBUG - Adding <TaskInstance:
> > example_bash_operator.run_this_last 2016-01-02 00:00:00 [None]> to
> > not_ready*
> >
> > *[2016-10-21 04:01:19,929] {jobs.py:1701} DEBUG - Task instance to run
> > <TaskInstance: example_bash_operator.runme_2 2016-01-02 00:00:00
> [success]>
> > state success*
> > *[2016-10-21 04:01:19,929] {jobs.py:1707} DEBUG - Task instance
> > <TaskInstance: example_bash_operator.runme_2 2016-01-02 00:00:00
> [success]>
> > succeeded. Don't rerun.*
> > *[2016-10-21 04:01:19,947] {jobs.py:1701} DEBUG - Task instance to run
> > <TaskInstance: example_bash_operator.runme_0 2016-01-02 00:00:00
> [success]>
> > state success*
> > *[2016-10-21 04:01:19,947] {jobs.py:1707} DEBUG - Task instance
> > <TaskInstance: example_bash_operator.runme_0 2016-01-02 00:00:00
> [success]>
> > succeeded. Don't rerun.*
> > *[2016-10-21 04:01:19,951] {jobs.py:1701} DEBUG - Task instance to run
> > <TaskInstance: example_bash_operator.runme_1 2016-01-02 00:00:00
> [success]>
> > state success*
> > *[2016-10-21 04:01:19,951] {jobs.py:1707} DEBUG - Task instance
> > <TaskInstance: example_bash_operator.runme_1 2016-01-02 00:00:00
> [success]>
> > succeeded. Don't rerun.*
> > *[2016-10-21 04:01:20,879] {jobs.py:180} DEBUG - [heart] Boom*
> >
> > *...*
> > *...*
> >
> > *======================================================ERROR:
> > test_backfill_multi_dates
> > (tests.BackfillJobTest)-------------------------------------
> ---------------------------------Traceback
> > (most recent call last):  File
> > "/home/travis/build/apache/incubator-airflow/tests/jobs.py", line 100,
> in
> > test_backfill_multi_dates    job.run()  File
> > "/home/travis/build/apache/incubator-airflow/airflow/jobs.py", line
> 194, in
> > run    self._execute()  File
> > "/home/travis/build/apache/incubator-airflow/airflow/jobs.py", line
> 1894,
> > in _execute    raise AirflowException(err)nose.proxy.AirflowException:
> > ---------------------------------------------------BackfillJob is
> > deadlocked. These tasks were unable to run:{<TaskInstance:
> > example_bash_operator.run_after_loop 2016-01-02 00:00:00 [None]>,
> > <TaskInstance: example_bash_operator.run_this_last 2016-01-02 00:00:00
> > [None]>}*
> >
> > *How to fix:*
> > The main reason for the race condition is that we are not synchronizing
> > access to the task instances in a DAG run.
> >
> > There are 3+n actors in the backfill system:
> >
> >   - Backfill loop
> >   - LocalExecutor object
> >   - Metastore DB
> >   - n LocalWorker processes
> >
> > The backfill loop and local executor run in the same process, so we don't
> > have to worry about synchronization between them. But we need to
> > synchronize access between the other actors. The channels of
> communication
> > in this context are:
> >
> >   - Backfill loop <-> LocalExecutor = event_buffer
> >   - Backfill loop <-> Metastore = SQL Alchemy ORM
> >   - Metastore <-> LocalWorker = SQL Alchemy ORM
> >
> > Which means the backfill loop has two versions of the task instance
> state,
> > one from the LocalExecutor event buffer (which gets updated when the
> worker
> > completes a task) and another from the metastore (which the worker also
> > writes to).
> >
> > If we consider the metastore to be the source of truth, we can
> synchronize
> > access by reading the state of *all* task instances for the DAG run in a
> > single query before the "*for key, ti in list(tasks_to_run.items())*"
> loop
> > and removing individual task instance refreshes inside the loop.
> >
> > I've implemented this fix and am seeing positive results so far. I'm
> > working through all the test cases before I push the PR out for review.
> >
> > Folks familiar with the backfill code, I appreciate hearing your
> thoughts.
> >
> > -Vijay
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message