airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Li Xuanji (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AIRFLOW-434) max_dag_run_reached blocks dag state change and new task scheduling
Date Tue, 16 Aug 2016 22:10:20 GMT
Li Xuanji created AIRFLOW-434:
---------------------------------

             Summary: max_dag_run_reached blocks dag state change and new task scheduling
                 Key: AIRFLOW-434
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-434
             Project: Apache Airflow
          Issue Type: Bug
            Reporter: Li Xuanji
            Assignee: Li Xuanji
            Priority: Blocker


Using the following DAG:

```
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
       	'owner': 'airflow',
       	'depends_on_past': False,
       	'start_date': datetime(2016, 1, 1, 1, 0),
       	'email': ['xuanji@gmail.com'],
       	'email_on_failure': True,
       	'email_on_retry': False,
       	'retries': 3,
       	'retry_delay': timedelta(minutes=1),
}

dag = DAG('bash_bash_bash', default_args=default_args, schedule_interval=timedelta(seconds=10))

# t1, t2 and t3 are examples of tasks created by instatiating operators
t1 = BashOperator(
       		task_id='print_date',
       		bash_command='date',
       		dag=dag)

t2 = BashOperator(
       		task_id='sleep',
       		bash_command='sleep 120',
       		retries=3,
       		dag=dag)

templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
       		task_id='templated',
       		bash_command=templated_command,
       		params={'my_param': 'Parameter I passed in'},
       		dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)
```

The state eventually becomes this:

http://imgur.com/a/5bRTe

The scheduler should be marking the 14 leftmost dagruns as success, but does not. the scheduler
should also be scheduling tasks for the last two dagruns.

A look at the logs explains the probable cause:

```
[2016-08-16 15:12:10,257] {jobs.py:1446} DagFileProcessor174 INFO - Processing file /Users/xuanji_li/airflow/dags/bash_bash_bash.py
for tasks to queue
[2016-08-16 15:12:10,258] {models.py:162} DagFileProcessor174 INFO - Filling up the DagBag
from /Users/xuanji_li/airflow/dags/bash_bash_bash.py
[2016-08-16 15:12:10,267] {jobs.py:1460} DagFileProcessor174 INFO - DAG(s) ['bash_bash_bash']
retrieved from /Users/xuanji_li/airflow/dags/bash_bash_bash.py
[2016-08-16 15:12:10,289] {jobs.py:1062} DagFileProcessor174 INFO - Not processing DAG bash_bash_bash
since its max runs has been reached
[2016-08-16 15:12:10,290] {models.py:313} DagFileProcessor174 INFO - Finding 'running' jobs
without a recent heartbeat
[2016-08-16 15:12:10,290] {models.py:319} DagFileProcessor174 INFO - Failing jobs without
heartbeat after 2016-08-16 15:09:55.290479
```

It seems that processing of the dagrun is skipped completely because there are already 16
running dagruns.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message