airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Siddharth Anand (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AIRFLOW-106) Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails
Date Thu, 12 May 2016 03:06:12 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15280591#comment-15280591
] 

Siddharth Anand edited comment on AIRFLOW-106 at 5/12/16 3:05 AM:
------------------------------------------------------------------

In a nutshell:

Task retries, on_failure callback, and email_on_failure are not honored if the first task
in a DAG fails. The bug presents itself when task retries are enabled and the first task in
a DAG run fails one time. If this happens, then retries are not honored on the task that only
failed one time. The task will be left in a permanent "UP_FOR_RETRY" state. As a result, failure
hooks will not be executed for that failed task including email notification that the task
failed.  



was (Author: sanand):
Hi [~adudczaAre you sure that emailing is actually working? 

The attempt to email won't be logged under your dag/task logs, but whereever you get system
logs. 

An easy way to test if email is working on your system is to set up the email operator and
then call it via "airflow test your_email_task 2016-05-11". The email config is in airflow.cfg.
-s

> Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in
a DAG Fails
> ------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-106
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-106
>             Project: Apache Airflow
>          Issue Type: Bug
>         Environment: Latest version from Git
>            Reporter: dud
>            Assignee: Siddharth Anand
>            Priority: Blocker
>
> Hello.
> I created the following workflow :
> {code}
> from airflow import DAG
> from airflow.operators import PythonOperator
> from datetime import datetime, timedelta
> from airflow.models import Variable
> from time import sleep
> default_args = {
>     'depends_on_past': False,
>     'start_date': datetime(2016, 5, 11, 15, 20),
>     'email': <my email>
>     'email_on_failure': True,
>     'email_on_retry': False,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=2),
>     'end_date': datetime(2016, 5, 11, 16, 00),
> }
> PARENT_DAG_NAME = 'test'
> dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=timedelta(minutes=10))
> def sleep1_function(**kwargs):
>     sleep(90)
>     return Variable.get('test_var')
> sleep1 = PythonOperator(
>     task_id='sleep1',
>     python_callable=sleep1_function,
>     dag=dag)
> {code}
> I forgot to declare test_var so when this DAG launched it failed quickly. However no
failure email was ever sent. Clearing the failed task to make it rerun doesn't trigger any
email.
> Here is the logs :
> {code}
> [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
> [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
> [2016-05-11 15:53:32,313] {models.py:1216} INFO - 
> --------------------------------------------------------------------------------
> Starting attempt 1 of 2
> --------------------------------------------------------------------------------
> [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing <Task(PythonOperator):
sleep1> on 2016-05-11 15:20:00
> [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not exist
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py",
line 1265, in run
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py",
line 66, in execute
>     return_value = self.python_callable(*self.op_args, **self.op_kwargs)
>   File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function
>     return Variable.get('test_var')
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py",
line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py",
line 3145, in get
>     raise ValueError('Variable {} does not exist'.format(key))
> ValueError: Variable test_var does not exist
> [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY
> [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not exist
> {code}
> In the DAG Runs page, the workflow is set as failed. In hte taks instance page, it is
set as up_for_retry but no new run is ever scheduled.
> I tried incrementing the retires parameter, but nothing different happens, Airflow never
retries after the first run.
> dud



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message