airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Siddharth Anand (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AIRFLOW-106) email_on_failure doesn't being triggered because dag FAILED before task_retries execute
Date Thu, 12 May 2016 02:57:12 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15281136#comment-15281136
] 

Siddharth Anand edited comment on AIRFLOW-106 at 5/12/16 2:56 AM:
------------------------------------------------------------------

Thanks for reporting this. It's a serious bug and I have escalated it to blocker. I've assigned
it to myself to better spread knowledge about scheduler changes around the committers. 

On Apr 4, the following commit was made to handle deadlocks.
https://github.com/apache/incubator-airflow/commit/2e0421a28347de9a24bb14f37d33988c50b901b2

Here's what is happening: 
* Your task is run in TaskInstance.run() in models.py
* When it fails, it calls self.handle_failure(e, test_mode, context) in the Exception
 block
* Since self. try_num==1 and task.retries==1, the task is placed in the UP_FOR_RETRY state
* The scheduler (in jobs.py), next runs process_dag(self, dag, queue)
* Since the task is UP_FOR_RETRY and since the retry interval has not yet passed, the task
is added to a could_not_run set
* Because len(could_not_run) == len(descartes), the dag is incorrectly deemed to be in a deadlocked
state and the DagRun is immediately FAILED

There are a few problems with this:
1. The retries are not honored
2. A DagRun is Failed, but there are TaskInstances left in a variety of states (e.g. UP_FOR_RETRY
or even blank)
3. Because the TaskInstance is not allowed to fail till its retries expire, it never get to
the point of being a failed TaskInstance -- email_on_failure is called on failed TaskInstances,
not Failed DagRuns!

When does this bug not occur?
* If the failing TaskInstance is not the first TaskInstance in the DAG, this bug does not
present itself. Though, I do need to verify if the DAGRun will be left in RUNNING state or
correctly FAILED. 
* If the user disables retries, then this bug does not occur, since the very first time into
the handle_failure() method, the email_on_error will be executed

cc [~jlowin][~aoen]

Other notes : this is a blocker bug because production airflow deployments often depend on
email failure notification via this *email_on_failure* hook. Additionally, retries are required
to make data pipelines fault-tolerant. Both are expected to work in Airflow. 


was (Author: sanand):
Thanks for reporting this. It's a serious bug and I have escalated it to blocker. I've assigned
it to myself to better spread knowledge about scheduler changes around the committers. 

On Apr 4, the following commit was made to handle deadlocks.
https://github.com/apache/incubator-airflow/commit/2e0421a28347de9a24bb14f37d33988c50b901b2

Here's what is happening: 
* Your task is run in TaskInstance.run() in models.py
* When it fails, it calls self.handle_failure(e, test_mode, context) in the Exception
 block
* Since self. try_num==1 and task.retries==1, the task is placed in the UP_FOR_RETRY state
* The scheduler (in jobs.py), next runs process_dag(self, dag, queue)
* Since the task is UP_FOR_RETRY and since the retry interval has not yet passed, the task
is added to a could_not_run set
* Because len(could_not_run) == len(descartes), the dag is incorrectly deemed to be in a deadlocked
state and the DagRun is immediately FAILED

There are a few problems with this:
1. The retries are not honored
2. A DagRun is Failed, but there are TaskInstances left in a variety of states (e.g. UP_FOR_RETRY
or even blank)
3. Because the TaskInstance is not allowed to fail till its retries expire, it never get to
the point of being a failed TaskInstance -- email_on_failure is called on failed TaskInstances,
not Failed DagRuns!

When does this bug not occur?
* If the failing TaskInstance is not the first TaskInstance in the DAG, this bug does not
present itself. Though, I do need to verify if the DAGRun will be left in RUNNING state or
correctly FAILED. 
* If the user disables retries, then this bug does not occur, since the very first time into
the handle_failure() method, the email_on_error will be executed

cc [~jlowin]

> email_on_failure doesn't being triggered because dag FAILED before task_retries execute
> ---------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-106
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-106
>             Project: Apache Airflow
>          Issue Type: Bug
>         Environment: Latest version from Git
>            Reporter: dud
>            Assignee: Siddharth Anand
>            Priority: Blocker
>
> Hello.
> I created the following workflow :
> {code}
> from airflow import DAG
> from airflow.operators import PythonOperator
> from datetime import datetime, timedelta
> from airflow.models import Variable
> from time import sleep
> default_args = {
>     'depends_on_past': False,
>     'start_date': datetime(2016, 5, 11, 15, 20),
>     'email': <my email>
>     'email_on_failure': True,
>     'email_on_retry': False,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=2),
>     'end_date': datetime(2016, 5, 11, 16, 00),
> }
> PARENT_DAG_NAME = 'test'
> dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=timedelta(minutes=10))
> def sleep1_function(**kwargs):
>     sleep(90)
>     return Variable.get('test_var')
> sleep1 = PythonOperator(
>     task_id='sleep1',
>     python_callable=sleep1_function,
>     dag=dag)
> {code}
> I forgot to declare test_var so when this DAG launched it failed quickly. However no
failure email was ever sent. Clearing the failed task to make it rerun doesn't trigger any
email.
> Here is the logs :
> {code}
> [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
> [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
> [2016-05-11 15:53:32,313] {models.py:1216} INFO - 
> --------------------------------------------------------------------------------
> Starting attempt 1 of 2
> --------------------------------------------------------------------------------
> [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing <Task(PythonOperator):
sleep1> on 2016-05-11 15:20:00
> [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not exist
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py",
line 1265, in run
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py",
line 66, in execute
>     return_value = self.python_callable(*self.op_args, **self.op_kwargs)
>   File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function
>     return Variable.get('test_var')
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py",
line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py",
line 3145, in get
>     raise ValueError('Variable {} does not exist'.format(key))
> ValueError: Variable test_var does not exist
> [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY
> [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not exist
> {code}
> In the DAG Runs page, the workflow is set as failed. In hte taks instance page, it is
set as up_for_retry but no new run is ever scheduled.
> I tried incrementing the retires parameter, but nothing different happens, Airflow never
retries after the first run.
> dud



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message