airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Li Xuanji (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AIRFLOW-434) max_dag_run_reached blocks dag state change and new task scheduling
Date Tue, 16 Aug 2016 23:12:20 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Li Xuanji updated AIRFLOW-434:
------------------------------
    Description: 
Using the following DAG:

```
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
       	'owner': 'airflow',
       	'depends_on_past': False,
       	'start_date': datetime(2016, 1, 1, 1, 0),
       	'email': ['xuanji@gmail.com'],
       	'email_on_failure': True,
       	'email_on_retry': False,
       	'retries': 3,
       	'retry_delay': timedelta(minutes=1),
}

dag = DAG('bash_bash_bash', default_args=default_args, schedule_interval=timedelta(seconds=10))

# t1, t2 and t3 are examples of tasks created by instatiating operators
t1 = BashOperator(
       		task_id='print_date',
       		bash_command='date',
       		dag=dag)

t2 = BashOperator(
       		task_id='sleep',
       		bash_command='sleep 120',
       		retries=3,
       		dag=dag)

templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
       		task_id='templated',
       		bash_command=templated_command,
       		params={'my_param': 'Parameter I passed in'},
       		dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)
```

and an `airflow.cfg` that contains this:

```
min_file_process_interval = 1
```

The state eventually becomes this:

http://imgur.com/a/5bRTe

The scheduler should be marking the 14 leftmost dagruns as success, but does not. the scheduler
should also be scheduling tasks for the last two dagruns.

A look at the logs explains the probable cause:

```
[2016-08-16 15:12:10,257] {jobs.py:1446} DagFileProcessor174 INFO - Processing file /Users/xuanji_li/airflow/dags/bash_bash_bash.py
for tasks to queue
[2016-08-16 15:12:10,258] {models.py:162} DagFileProcessor174 INFO - Filling up the DagBag
from /Users/xuanji_li/airflow/dags/bash_bash_bash.py
[2016-08-16 15:12:10,267] {jobs.py:1460} DagFileProcessor174 INFO - DAG(s) ['bash_bash_bash']
retrieved from /Users/xuanji_li/airflow/dags/bash_bash_bash.py
[2016-08-16 15:12:10,289] {jobs.py:1062} DagFileProcessor174 INFO - Not processing DAG bash_bash_bash
since its max runs has been reached
[2016-08-16 15:12:10,290] {models.py:313} DagFileProcessor174 INFO - Finding 'running' jobs
without a recent heartbeat
[2016-08-16 15:12:10,290] {models.py:319} DagFileProcessor174 INFO - Failing jobs without
heartbeat after 2016-08-16 15:09:55.290479
```

It seems that processing of the dagrun is skipped completely because there are already 16
running dagruns.

Binary search tracked down this commit as the one that introduced the bug. The logic added
looks wrong to me. https://github.com/apache/incubator-airflow/pull/1716

  was:
Using the following DAG:

```
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
       	'owner': 'airflow',
       	'depends_on_past': False,
       	'start_date': datetime(2016, 1, 1, 1, 0),
       	'email': ['xuanji@gmail.com'],
       	'email_on_failure': True,
       	'email_on_retry': False,
       	'retries': 3,
       	'retry_delay': timedelta(minutes=1),
}

dag = DAG('bash_bash_bash', default_args=default_args, schedule_interval=timedelta(seconds=10))

# t1, t2 and t3 are examples of tasks created by instatiating operators
t1 = BashOperator(
       		task_id='print_date',
       		bash_command='date',
       		dag=dag)

t2 = BashOperator(
       		task_id='sleep',
       		bash_command='sleep 120',
       		retries=3,
       		dag=dag)

templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
       		task_id='templated',
       		bash_command=templated_command,
       		params={'my_param': 'Parameter I passed in'},
       		dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)
```

The state eventually becomes this:

http://imgur.com/a/5bRTe

The scheduler should be marking the 14 leftmost dagruns as success, but does not. the scheduler
should also be scheduling tasks for the last two dagruns.

A look at the logs explains the probable cause:

```
[2016-08-16 15:12:10,257] {jobs.py:1446} DagFileProcessor174 INFO - Processing file /Users/xuanji_li/airflow/dags/bash_bash_bash.py
for tasks to queue
[2016-08-16 15:12:10,258] {models.py:162} DagFileProcessor174 INFO - Filling up the DagBag
from /Users/xuanji_li/airflow/dags/bash_bash_bash.py
[2016-08-16 15:12:10,267] {jobs.py:1460} DagFileProcessor174 INFO - DAG(s) ['bash_bash_bash']
retrieved from /Users/xuanji_li/airflow/dags/bash_bash_bash.py
[2016-08-16 15:12:10,289] {jobs.py:1062} DagFileProcessor174 INFO - Not processing DAG bash_bash_bash
since its max runs has been reached
[2016-08-16 15:12:10,290] {models.py:313} DagFileProcessor174 INFO - Finding 'running' jobs
without a recent heartbeat
[2016-08-16 15:12:10,290] {models.py:319} DagFileProcessor174 INFO - Failing jobs without
heartbeat after 2016-08-16 15:09:55.290479
```

It seems that processing of the dagrun is skipped completely because there are already 16
running dagruns.

Binary search tracked down this commit as the one that introduced the bug. The logic added
looks wrong to me. https://github.com/apache/incubator-airflow/pull/1716


> max_dag_run_reached blocks dag state change and new task scheduling
> -------------------------------------------------------------------
>
>                 Key: AIRFLOW-434
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-434
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Li Xuanji
>            Assignee: Li Xuanji
>            Priority: Blocker
>
> Using the following DAG:
> ```
> from airflow import DAG
> from airflow.operators.bash_operator import BashOperator
> from datetime import datetime, timedelta
> default_args = {
>        	'owner': 'airflow',
>        	'depends_on_past': False,
>        	'start_date': datetime(2016, 1, 1, 1, 0),
>        	'email': ['xuanji@gmail.com'],
>        	'email_on_failure': True,
>        	'email_on_retry': False,
>        	'retries': 3,
>        	'retry_delay': timedelta(minutes=1),
> }
> dag = DAG('bash_bash_bash', default_args=default_args, schedule_interval=timedelta(seconds=10))
> # t1, t2 and t3 are examples of tasks created by instatiating operators
> t1 = BashOperator(
>        		task_id='print_date',
>        		bash_command='date',
>        		dag=dag)
> t2 = BashOperator(
>        		task_id='sleep',
>        		bash_command='sleep 120',
>        		retries=3,
>        		dag=dag)
> templated_command = """
> {% for i in range(5) %}
> echo "{{ ds }}"
> echo "{{ macros.ds_add(ds, 7)}}"
> echo "{{ params.my_param }}"
> {% endfor %}
> """
> t3 = BashOperator(
>        		task_id='templated',
>        		bash_command=templated_command,
>        		params={'my_param': 'Parameter I passed in'},
>        		dag=dag)
> t2.set_upstream(t1)
> t3.set_upstream(t1)
> ```
> and an `airflow.cfg` that contains this:
> ```
> min_file_process_interval = 1
> ```
> The state eventually becomes this:
> http://imgur.com/a/5bRTe
> The scheduler should be marking the 14 leftmost dagruns as success, but does not. the
scheduler should also be scheduling tasks for the last two dagruns.
> A look at the logs explains the probable cause:
> ```
> [2016-08-16 15:12:10,257] {jobs.py:1446} DagFileProcessor174 INFO - Processing file /Users/xuanji_li/airflow/dags/bash_bash_bash.py
for tasks to queue
> [2016-08-16 15:12:10,258] {models.py:162} DagFileProcessor174 INFO - Filling up the DagBag
from /Users/xuanji_li/airflow/dags/bash_bash_bash.py
> [2016-08-16 15:12:10,267] {jobs.py:1460} DagFileProcessor174 INFO - DAG(s) ['bash_bash_bash']
retrieved from /Users/xuanji_li/airflow/dags/bash_bash_bash.py
> [2016-08-16 15:12:10,289] {jobs.py:1062} DagFileProcessor174 INFO - Not processing DAG
bash_bash_bash since its max runs has been reached
> [2016-08-16 15:12:10,290] {models.py:313} DagFileProcessor174 INFO - Finding 'running'
jobs without a recent heartbeat
> [2016-08-16 15:12:10,290] {models.py:319} DagFileProcessor174 INFO - Failing jobs without
heartbeat after 2016-08-16 15:09:55.290479
> ```
> It seems that processing of the dagrun is skipped completely because there are already
16 running dagruns.
> Binary search tracked down this commit as the one that introduced the bug. The logic
added looks wrong to me. https://github.com/apache/incubator-airflow/pull/1716



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message