airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bolke de Bruin <bdbr...@gmail.com>
Subject Re: Airflow 1.8 not honoring retry_delay (BLOCKER)
Date Thu, 12 Jan 2017 20:41:10 GMT
Ok I figured it out. Actually two bugs are observed:

1. Retry_period not honoured
2. UP_FOR_RETRY tasks that with a dag_run that is marked failed are nevertheless executed.

The problem is a combination of 3 issues.

1. The dag_run is marked failed although a task is still in its retry_delay_period
2. A task does not check its own “retry_delay” as the task is set to “queued” by the
scheduler and not “up_for_retry”
3. _execute_task_instance is called with “UP_FOR_RETRY” and “SCHEDULED”. This function
pulls tasks directly from the database and does not check dependencies.

I think (for now) the fix is to:

1. Set tasks that are up_for_retry and out of their retry period to “SCHEDULED”
2. Update the call to _execute_task_instance to only allow “SCHEDULED” tasks

- Bolke

> On 12 Jan 2017, at 20:53, Bolke de Bruin <bdbruin@gmail.com> wrote:
> 
> While the dag_run is not marked as failed and the Task should not retry yet it is nevertheless
still scheduled. 
> 
> Bolke
> 
> 
>> On 12 Jan 2017, at 20:16, Bolke de Bruin <bdbruin@gmail.com <mailto:bdbruin@gmail.com>>
wrote:
>> 
>> Hi All,
>> 
>> I further analysed this and its an error with the dependency engine. In the current
state of master / 1.8 when updating the state of a DAG run, the “UP_FOR_RETRY” state is
not considered. This leads to a dag run that is marked deadlocked.
>> 
>> Applying a patch from https://github.com/apache/incubator-airflow/pull/1934 <https://github.com/apache/incubator-airflow/pull/1934>
, particularly the “ignore_in_retry_period”-part the logs show that the retry_period is
being honoured, but eventually the dag run still gets marked deadlocked.
>> 
>> These are the logs:
>> 
>> [2017-01-12 20:12:00,840] {jobs.py:801} DagFileProcessor262 INFO - Examining DAG
run <DagRun test_retry_handling_job_spotify @ 2016-10-05 19:00:00: scheduled__2016-10-05T19:00:00,
externally triggered: False>
>> [2017-01-12 20:12:00,846] {models.py:3865} DagFileProcessor262 INFO - Updating state
for <DagRun test_retry_handling_job_spotify @ 2016-10-05 19:00:00: scheduled__2016-10-05T19:00:00,
externally triggered: False> considering 2 task(s)
>> [2017-01-12 20:12:00,849] {not_in_retry_period_dep.py:30} DagFileProcessor262 INFO
- Checking (u'test_retry_handling_job_spotify', u'test_retry_handling_op1', datetime.datetime(2016,
10, 5, 19, 0)) state: up_for_retry
>> [2017-01-12 20:12:00,852] {jobs.py:844} DagFileProcessor262 INFO - Checking dependencies
>> [2017-01-12 20:12:00,862] {jobs.py:850} DagFileProcessor262 INFO - Done checking
and queuing
>> [2017-01-12 20:12:00,862] {jobs.py:844} DagFileProcessor262 INFO - Checking dependencies
>> [2017-01-12 20:12:00,862] {not_in_retry_period_dep.py:30} DagFileProcessor262 INFO
- Checking (u'test_retry_handling_job_spotify', u'test_retry_handling_op1', datetime.datetime(2016,
10, 5, 19, 0)) state: up_for_retry
>> [2017-01-12 20:12:00,862] {models.py:996} DagFileProcessor262 INFO - Prem: State
up_for_retry, next_retry: 2017-01-12 20:20:13.122766 , now: 2017-01-12 20:12:00.862789
>> [2017-01-12 20:12:00,862] {models.py:1109} DagFileProcessor262 INFO - State up_for_retry,
next_retry: 2017-01-12 20:20:13.122766 , now: 2017-01-12 20:12:00.862928
>> [2017-01-12 20:12:00,863] {jobs.py:850} DagFileProcessor262 INFO - Done checking
and queuing
>> /Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/SQLAlchemy-1.1.4-py2.7-macosx-10.12-x86_64.egg/sqlalchemy/sql/default_comparator.py:161:
SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results
in a contradiction, which nonetheless can be expensive to evaluate.  Consider alternative
strategies for improved performance.
>> [2017-01-12 20:12:00,879] {models.py:321} DagFileProcessor262 INFO - Finding 'running'
jobs without a recent heartbeat
>> [2017-01-12 20:12:00,879] {models.py:327} DagFileProcessor262 INFO - Failing jobs
without heartbeat after 2017-01-12 20:07:00.879746
>> [2017-01-12 20:12:00,886] {jobs.py:331} DagFileProcessor262 INFO - Processing /Users/bolke/airflow/dags/retry_delay.py
took 0.095 seconds
>> 
>> 
>> [2017-01-12 20:12:05,962] {jobs.py:323} DagFileProcessor272 INFO - Started process
(PID=25181) to work on /Users/bolke/airflow/dags/retry_delay.py
>> [2017-01-12 20:12:05,975] {jobs.py:1483} DagFileProcessor272 INFO - Processing file
/Users/bolke/airflow/dags/retry_delay.py for tasks to queue
>> [2017-01-12 20:12:05,975] {models.py:168} DagFileProcessor272 INFO - Filling up the
DagBag from /Users/bolke/airflow/dags/retry_delay.py
>> [2017-01-12 20:12:05,982] {jobs.py:1497} DagFileProcessor272 INFO - DAG(s) ['test_retry_handling_job_spotify']
retrieved from /Users/bolke/airflow/dags/retry_delay.py
>> [2017-01-12 20:12:06,001] {jobs.py:1109} DagFileProcessor272 INFO - Processing test_retry_handling_job_spotify
>> [2017-01-12 20:12:06,008] {jobs.py:801} DagFileProcessor272 INFO - Examining DAG
run <DagRun test_retry_handling_job_spotify @ 2016-10-05 19:00:00: scheduled__2016-10-05T19:00:00,
externally triggered: False>
>> [2017-01-12 20:12:06,014] {models.py:3865} DagFileProcessor272 INFO - Updating state
for <DagRun test_retry_handling_job_spotify @ 2016-10-05 19:00:00: scheduled__2016-10-05T19:00:00,
externally triggered: False> considering 2 task(s)
>> [2017-01-12 20:12:06,021] {models.py:3914} DagFileProcessor272 INFO - Deadlock; marking
run <DagRun test_retry_handling_job_spotify @ 2016-10-05 19:00:00: scheduled__2016-10-05T19:00:00,
externally triggered: False> failed
>> 
>> I consider this a blocker for 1.8 and I could use some help in solving the issue.
>> 
>> Bolke.
>> 
>>> On 11 Jan 2017, at 21:45, Bolke de Bruin <bdbruin@gmail.com <mailto:bdbruin@gmail.com>>
wrote:
>>> 
>>> First analysis:
>>> 
>>> The dependency checker (at least the retry_delay checker) is not called in the
scheduler for some reason. As it leaves the scheduler the state will be set to “queued”
which won’t match the “is_premature” requirement of “UP_FOR_RETRY”.
>>> 
>>> @Dan: are you able to help out here? 
>>> 
>>> Bolke
>>> 
>>>> On 11 Jan 2017, at 18:31, Harvey Xia <harveyxia@spotify.com.INVALID <mailto:harveyxia@spotify.com.INVALID>>
wrote:
>>>> 
>>>> Hi Bolke,
>>>> 
>>>> Here is the JIRA issue, https://issues.apache.org/jira/browse/AIRFLOW-747
<https://issues.apache.org/jira/browse/AIRFLOW-747>.
>>>> 
>>>> 
>>>> Harvey Xia | Software Engineer
>>>> harveyxia@spotify.com <mailto:harveyxia@spotify.com>
>>>> +1 (339) 225 1875
>>>> 
>>>> On Wed, Jan 11, 2017 at 11:32 AM, Bolke de Bruin <bdbruin@gmail.com <mailto:bdbruin@gmail.com>>
wrote:
>>>> 
>>>>> Hi Harvey,
>>>>> 
>>>>> Thanks for reporting! Can you create a lira for this? I’ll have a look
if
>>>>> I can reproduce it.
>>>>> 
>>>>> - Bolke
>>>>> 
>>>>>> On 11 Jan 2017, at 16:06, Harvey Xia <harveyxia@spotify.com.INVALID
<mailto:harveyxia@spotify.com.INVALID>>
>>>>> wrote:
>>>>>> 
>>>>>> Hi all,
>>>>>> 
>>>>>> In Airflow 1.8 alpha 2, using LocalExecutor, DAGs do not seem to
honor
>>>>> the
>>>>>> retry_delay parameter, i.e. the retries happen immediately one after
the
>>>>>> other without waiting the specific retry_delay time. However, the
>>>>> *number*
>>>>>> of retries is honored. I am testing with the following code:
>>>>>> 
>>>>>> from airflow import DAG
>>>>>> from airflow.operators.bash_operator import BashOperator
>>>>>> from datetime import datetime, timedelta
>>>>>> 
>>>>>> default_args = {
>>>>>> 'owner': 'airflow',
>>>>>> 'depends_on_past': False,
>>>>>> 'start_date': datetime(2016, 10, 5, 19),
>>>>>> 'end_date': datetime(2016, 10, 6, 19),
>>>>>> 'email': ['airflow@airflow.com <mailto:airflow@airflow.com>'],
>>>>>> 'email_on_failure': False,
>>>>>> 'email_on_retry': False,
>>>>>> 'retries': 10,
>>>>>> 'retry_delay': timedelta(0, 500)
>>>>>> }
>>>>>> 
>>>>>> dag = DAG('test_retry_handling_job', default_args=default_args,
>>>>>> schedule_interval='@once')
>>>>>> 
>>>>>> task1 = BashOperator(
>>>>>> task_id='test_retry_handling_op1',
>>>>>> bash_command='exit 1',
>>>>>> dag=dag)
>>>>>> 
>>>>>> task2 = BashOperator(
>>>>>> task_id='test_retry_handling_op2',
>>>>>> bash_command='exit 1',
>>>>>> dag=dag)
>>>>>> 
>>>>>> task2.set_upstream(task1)
>>>>>> 
>>>>>> Let me know if anyone has any ideas about this issue, thanks!
>>>>>> 
>>>>>> Harvey Xia | Software Engineer
>>>>>> harveyxia@spotify.com <mailto:harveyxia@spotify.com>
>>>>>> +1 (339) 225 1875
>>>>> 
>>>>> 
>>> 
>> 
> 


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message