airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jeremiah Lowin (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-47) ExternalTaskSensor causes scheduling dead lock
Date Wed, 18 May 2016 16:32:13 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-47?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15289257#comment-15289257
] 

Jeremiah Lowin commented on AIRFLOW-47:
---------------------------------------

Hi Hila,

I'm glad the priority_weight has solved this for you -- from that, I understand that your
issue was simply not being able to run enough tasks at once? And using priority weights makes
sure that the hourly tasks run first?

My understanding was that you had an hourly DAG that runs 24 times each day, and also 1 daily
DAG, and wanted to make sure that the daily DAG ran AFTER all of the hourly DAGs on each day.
To guarantee that, I think you need 24 external task sensors, one for each hour, rather than
just one that only looks at the last hour. For example, with that set up the 9:00am hourly
DAG could fail and your daily DAG would still run if the 11:00pm hourly DAG succeeded.

To use XComs, you would have each hourly task return a value when it completed (maybe just
{{True}}) and then query for all 24 values in your daily dag ({{xcom.pull(task_id=hourly_id_1,
execution_date=hourly_execution_date)}} and if all 24 are True, proceed. In other words, you
request the xcom value for a certain execution date, not just the most recent one (which is
the default as you said).

> ExternalTaskSensor causes scheduling dead lock
> ----------------------------------------------
>
>                 Key: AIRFLOW-47
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-47
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: operators, scheduler
>    Affects Versions: Airflow 1.7.0
>         Environment: CentOS 6.5
> Airflow 1.7.0 with SequentialExecuter 
>            Reporter: Hila Visan
>         Attachments: screenshot-1.png
>
>
> We are trying to use 'ExternalTaskSensor' to coordinate between a daily DAG and an hourly
DAG  (daily dags  depend on hourly).
> Relevant code: 
> *Daily DAG definition:*
> {code:title=2_daily_dag.py|borderStyle=solid}
> default_args = {
>     …
>     'start_date': datetime(2016, 4, 2),
>     …
> }
> dag = DAG(dag_id='2_daily_agg', default_args=default_args, schedule_interval="@daily")
> ext_dep = ExternalTaskSensor(
>     external_dag_id='1_hourly_agg',
>     external_task_id='print_hourly1',
>     task_id='evening_hours_sensor',
>     dag=dag)
> {code}
> *Hourly DAG definition:*
> {code:title=1_hourly_dag.py|borderStyle=solid}
> default_args = {
>     …
>     'start_date': datetime(2016, 4, 1),
>     …
> }
> dag = DAG(dag_id='1_hourly_agg', default_args=default_args, schedule_interval="@hourly")
> t1 = BashOperator(
>     task_id='print_hourly1',
>     bash_command='echo hourly job1',
>     dag=dag)
> {code}
> The hourly dag was executed twice for the following execution dates:
> 04-01T00:00:00	
> 04-01T01:00:00
> Then the daily dag was executed, and is still running....	 
> According to logs, daily dag is waiting for hourly dag to complete:
> {noformat}
> [2016-05-04 06:01:20,978] {models.py:1041} INFO - Executing<Task(ExternalTaskSensor):
evening_hours_sensor> on 2016-04-03 00:00:00
> [2016-05-04 06:01:20,984] {sensors.py:188} INFO - Poking for 1_hourly_agg.print_hourly1
on 2016-04-02 00:00:00 ... 
> [2016-05-04 06:02:21,053] {sensors.py:188} INFO - Poking for 1_hourly_agg.print_hourly1
on 2016-04-02 00:00:00 ... }}
> {noformat}
> How can I solve this dead-lock?
> In Addition- I didn't understand if it means that the daily dag depends only on the "last"
hourly dag of the same day (23-24pm)? 
> What happens if the hourly dag of other hour fails?
> Thanks a lot! 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message