airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Riccomini (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-106) Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails
Date Thu, 12 May 2016 15:32:13 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15281615#comment-15281615
] 

Chris Riccomini commented on AIRFLOW-106:
-----------------------------------------

[~sanand], on your last screenshot, why is the DAG run still showing as running even though
the sleep1 task is failed?

> Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in
a DAG Fails
> ------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-106
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-106
>             Project: Apache Airflow
>          Issue Type: Bug
>         Environment: Latest version from Git
>            Reporter: dud
>            Assignee: Siddharth Anand
>            Priority: Blocker
>         Attachments: bug_works_if_not_first_task_that_fails.png, screenshot-1.png
>
>
> Hello.
> I created the following workflow :
> {code}
> from airflow import DAG
> from airflow.operators import PythonOperator
> from datetime import datetime, timedelta
> from airflow.models import Variable
> from time import sleep
> default_args = {
>     'depends_on_past': False,
>     'start_date': datetime(2016, 5, 11, 15, 20),
>     'email': <my email>
>     'email_on_failure': True,
>     'email_on_retry': False,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=2),
>     'end_date': datetime(2016, 5, 11, 16, 00),
> }
> PARENT_DAG_NAME = 'test'
> dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=timedelta(minutes=10))
> def sleep1_function(**kwargs):
>     sleep(90)
>     return Variable.get('test_var')
> sleep1 = PythonOperator(
>     task_id='sleep1',
>     python_callable=sleep1_function,
>     dag=dag)
> {code}
> I forgot to declare test_var so when this DAG launched it failed quickly. However no
failure email was ever sent. Clearing the failed task to make it rerun doesn't trigger any
email.
> Here is the logs :
> {code}
> [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
> [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
> [2016-05-11 15:53:32,313] {models.py:1216} INFO - 
> --------------------------------------------------------------------------------
> Starting attempt 1 of 2
> --------------------------------------------------------------------------------
> [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing <Task(PythonOperator):
sleep1> on 2016-05-11 15:20:00
> [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not exist
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py",
line 1265, in run
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py",
line 66, in execute
>     return_value = self.python_callable(*self.op_args, **self.op_kwargs)
>   File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function
>     return Variable.get('test_var')
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py",
line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py",
line 3145, in get
>     raise ValueError('Variable {} does not exist'.format(key))
> ValueError: Variable test_var does not exist
> [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY
> [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not exist
> {code}
> In the DAG Runs page, the workflow is set as failed. In hte taks instance page, it is
set as up_for_retry but no new run is ever scheduled.
> I tried incrementing the retires parameter, but nothing different happens, Airflow never
retries after the first run.
> dud



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message