airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "George Leslie-Waksman (JIRA)" <>
Subject [jira] [Commented] (AIRFLOW-1463) Clear state of queued task when it fails due to DAG import error
Date Fri, 28 Jul 2017 17:27:00 GMT


George Leslie-Waksman commented on AIRFLOW-1463:

This is a scheduler / executor bug.

The crux of the problem is that the celery executor maintains internal state on what it has
sent to be scheduled. For certain task instance states, as recorded in the metadata db, the
scheduler will not attempt to reschedule until the state has changed. If the worker that picks
up the task instance fails before it is able to change the task instance state in the metadata
db, the task will get stuck in a "QUEUED" state.

As a work around, you can restart the scheduler to clear its internal state. In 1.8.1, the
-r option can be used to automatically restart at a regular interval.

In order to fix this issue, the celery executor will need to be modified to: a) regularly
clear its internal state; b) synchronize its internal state with the state of the message
queue; or c) use message queue state directly to determine what has been queued.

Your proposed fix may ameliorate the issue in your particular case but it introduces a number
of race conditions around scheduler restarts that break some of the existing protections against
running the same task multiple times.

It might clarify if you changed the title of this issue to something like, "Scheduler does
not reschedule tasks in QUEUED state"

> Clear state of queued task when it fails due to DAG import error
> ----------------------------------------------------------------
>                 Key: AIRFLOW-1463
>                 URL:
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: cli
>         Environment: Ubuntu 14.04
> Airflow 1.8.0
> SQS backed task queue, AWS RDS backed meta storage
> DAG folder is synced by script on code push: archive is downloaded from s3, unpacked,
moved, install script is run. airflow executable is replaced with symlink pointing to the
latest version of code, no airflow processes are restarted.
>            Reporter: Stanislav Pak
>            Assignee: Stanislav Pak
>            Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
> Our pipelines related code is deployed almost simultaneously on all airflow boxes: scheduler+webserver
box, workers boxes. Some common python package is deployed on those boxes on every other code
push (3-5 deployments per hour). Due to installation specifics, a DAG that imports module
from that package might fail. If DAG import fails when worker runs a task, the task is still
removed from the queue but task state is not changed, so in this case the task stays in QUEUED
state forever.
> Beside the described case, there is scenario when it happens because of DAG update lag
in scheduler. A task can be scheduled with old DAG and worker can run the task with new DAG
that fails to be imported.
> There might be other scenarios when it happens.
> Proposal:
> Catch errors when importing DAG on task run and clear task instance state if import fails.
This should fix transient issues of this kind.

This message was sent by Atlassian JIRA

View raw message