airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bjorn Olsen (Jira)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
Date Fri, 06 Dec 2019 19:44:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16990119#comment-16990119
] 

Bjorn Olsen commented on AIRFLOW-6190:
--------------------------------------

Thanks for the feedback guys. It does seem similar.

Could you take a look at the fix I found and see what you think? 
I'm no scheduler expert so I thought it would be great if you could take a look. 
[https://github.com/baolsen/airflow/pull/10/files]

Basically I changed the scheduler to prevent doing SCHEDULED -> QUEUED -> SCHEDULED
immediately if the task was just queued now and the executor has not yet had a chance to look
at it. This happens when the executor's task was completed but it is still waiting in its
heartbeat loop, and meanwhile the scheduler thinks it can queue up new tasks.

Do you think your changes would avoid this scenario or could it still happen?

> Task instances queued and dequeued before worker is ready, causing intermittently failed
tasks
> ----------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6190
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6190
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.10.6
>            Reporter: Bjorn Olsen
>            Assignee: Bjorn Olsen
>            Priority: Minor
>         Attachments: image-2019-12-06-13-55-33-974.png
>
>
> Below dag creates 20 identical simple tasks which depend on each other in series.
> Installing the DAG and executing all the DAG runs works perfectly the first time around.
> Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent
task failures.
> Edit: This isn't specifically tied to the first and second round; it seems to randomly
affect an entire set of dag runs or not affect the set at all. This makes me suspect a timing
issue between the executor and scheduler (sometimes they align and sometimes they dont).
> {code:java}
> from builtins import range
> from datetime import timedelta
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.latest_only_operator import LatestOnlyOperator
> from airflow.operators.python_operator import (BranchPythonOperator,
>                                                PythonOperator)
> import sys, os
> args = {
>     'owner': 'airflow',
>     'start_date': airflow.utils.dates.days_ago(5),
> }
> dag = DAG(
>     dag_id='bug_testing_dag',
>     default_args=args,
>     schedule_interval='@daily',
>     max_active_runs=1
> )
> def func():
>    pass
> prev_task = None
> for i in range(0,20):
>     task = PythonOperator(
>         task_id='task_{0}'.format(i),
>         python_callable=func,
>         dag=dag,)
>     if prev_task:
>         prev_task >> task
>     
>     prev_task = task
> if __name__ == "__main__":
>     dag.cli(){code}
> I am using the LocalExecutor.
>  job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 5
> Example:
> !image-2019-12-06-13-55-33-974.png|width=398,height=276!
>  
> The second attempt tasks have 2 Logs shown on the UI if they were successful, and 2 physical
log files on disk. However the tasks that Failed only have 1 log shown on the UI, despite
there being 2 physical log files on disk. (Presumably the UI uses the Airflow DB which for
some reason isn't aware of the second log for the failed tasks).
>  
> Anyway I am more interested in the intermittent failures than what logs are shown on
the UI. 
> Here is an example of the second log file for the Failed task attempts: 
> {code:java}
> [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for <TaskInstance:
bug_testing_dag.task_1 2019-12-01T00:00:00+00:00 [scheduled]>, dependency 'Task Instance
State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution.
The task must be cleared in order to be run.
> [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 13:40:57,065] {local_task_job.py:91}
INFO - Task is not able to be run
> [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for <TaskInstance:
bug_testing_dag.task_1 2019-12-01T00:00:00+00:00 [failed]>, dependency 'Task Instance State'
FAILED: Task is in the 'failed' state which is not a valid state for execution. The task must
be cleared in order to be run.
> [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 13:41:09,005] {local_task_job.py:91}
INFO - Task is not able to be run
> {code}
>  
> At first I thought this was because the workers were still busy with the previous TaskInstance
(because there is a delay between when a TaskInstance state is set to SUCCESS, and when the
worker is actually done with it, because of the worker heartbeat). The scheduler thinks the
next task can be SCHEDULED -> QUEUED, but does not start as the worker is still busy, and
therefore it goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, causing
the failure above when the worker eventually wants to start it.
> However what is a mystery to me is why it works the first time the dag_run runs, and
not the second time. Perhaps it is something specific to my environment. 
> I'm going to try and debug this myself but if anyone else can replicate this issue in
their environment it could help me understand if it is just affecting me (or not). 
>  Just install the DAG, let it run 100% once, then clear it and let it run again (and
you should start seeing random failures)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message