airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Li Xuanji (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-434) max_dag_run_reached blocks dag state change and new task scheduling
Date Fri, 19 Aug 2016 02:56:20 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15427521#comment-15427521
] 

Li Xuanji commented on AIRFLOW-434:
-----------------------------------

I think new task scheduling should be allowed as long as it is on an already active dagrun.
So, in the example of http://imgur.com/a/5bRTe, the tasks on the right can be scheduled because
scheduling them will not bring us over 16 running dagruns.

Otherwise, we could end up in a situation where for eg, we have 16 running dagruns, each with
two tasks, one task has state success, the other has no state. We would need to schedule one
of the tasks with no state, otherwise we will forever be stuck in that state.

> max_dag_run_reached blocks dag state change and new task scheduling
> -------------------------------------------------------------------
>
>                 Key: AIRFLOW-434
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-434
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Li Xuanji
>            Assignee: Siddharth Anand
>            Priority: Blocker
>
> Using the following DAG:
> ```
> from airflow import DAG
> from airflow.operators.bash_operator import BashOperator
> from datetime import datetime, timedelta
> default_args = {
>        	'owner': 'airflow',
>        	'depends_on_past': False,
>        	'start_date': datetime(2016, 1, 1, 1, 0),
>        	'email': ['xuanji@gmail.com'],
>        	'email_on_failure': True,
>        	'email_on_retry': False,
>        	'retries': 3,
>        	'retry_delay': timedelta(minutes=1),
> }
> dag = DAG('bash_bash_bash', default_args=default_args, schedule_interval=timedelta(seconds=10))
> # t1, t2 and t3 are examples of tasks created by instatiating operators
> t1 = BashOperator(
>        		task_id='print_date',
>        		bash_command='date',
>        		dag=dag)
> t2 = BashOperator(
>        		task_id='sleep',
>        		bash_command='sleep 120',
>        		retries=3,
>        		dag=dag)
> templated_command = """
> {% for i in range(5) %}
> echo "{{ ds }}"
> echo "{{ macros.ds_add(ds, 7)}}"
> echo "{{ params.my_param }}"
> {% endfor %}
> """
> t3 = BashOperator(
>        		task_id='templated',
>        		bash_command=templated_command,
>        		params={'my_param': 'Parameter I passed in'},
>        		dag=dag)
> t2.set_upstream(t1)
> t3.set_upstream(t1)
> ```
> and an `airflow.cfg` that contains this:
> ```
> min_file_process_interval = 1
> ```
> The state eventually becomes this:
> http://imgur.com/a/5bRTe
> The scheduler should be marking the 14 leftmost dagruns as success, but does not. the
scheduler should also be scheduling tasks for the last two dagruns.
> A look at the logs explains the probable cause:
> ```
> [2016-08-16 15:12:10,257] {jobs.py:1446} DagFileProcessor174 INFO - Processing file /Users/xuanji_li/airflow/dags/bash_bash_bash.py
for tasks to queue
> [2016-08-16 15:12:10,258] {models.py:162} DagFileProcessor174 INFO - Filling up the DagBag
from /Users/xuanji_li/airflow/dags/bash_bash_bash.py
> [2016-08-16 15:12:10,267] {jobs.py:1460} DagFileProcessor174 INFO - DAG(s) ['bash_bash_bash']
retrieved from /Users/xuanji_li/airflow/dags/bash_bash_bash.py
> [2016-08-16 15:12:10,289] {jobs.py:1062} DagFileProcessor174 INFO - Not processing DAG
bash_bash_bash since its max runs has been reached
> [2016-08-16 15:12:10,290] {models.py:313} DagFileProcessor174 INFO - Finding 'running'
jobs without a recent heartbeat
> [2016-08-16 15:12:10,290] {models.py:319} DagFileProcessor174 INFO - Failing jobs without
heartbeat after 2016-08-16 15:09:55.290479
> ```
> It seems that processing of the dagrun is skipped completely because there are already
16 running dagruns.
> Binary search tracked down this commit as the one that introduced the bug. The logic
added looks wrong to me. https://github.com/apache/incubator-airflow/pull/1716



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message