airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Iuliia Volkova (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-3182) 'all_done' trigger rule works incorrectly with BranchPythonOperator upstream tasks
Date Thu, 15 Nov 2018 12:55:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-3182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687993#comment-16687993
] 

Iuliia Volkova commented on AIRFLOW-3182:
-----------------------------------------

[~Zeckt], I got your case, you need after your branch one more branch task - where you need
to check if it already 23 hour or not. This BranchTask needs to be upstream on all hours.
And this task will downstream Your aggregation task. 

> 'all_done' trigger rule works incorrectly with BranchPythonOperator upstream tasks
> ----------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3182
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3182
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.9.0, 1.10.0
>            Reporter: Greg H
>            Priority: Major
>         Attachments: BrannchPythonOperator.png, Screen Shot 2018-11-15 at 13.51.07.png
>
>
> We have a job that runs some data processing every hour. At the end of the day we need
to run aggregation on all data generated by the 'hourly' jobs, regardless if any 'hourly'
job failed or not. For this purpose we have prepared DAG that uses BranchPythonOperator in
order to decide which 'hourly' job needs to be run in given time and when task for hour 23
is done, we trigger the aggregation (downstream). For this to work regardless of the last
'hourly' task status the *'all_done'* trigger rule is set in the aggregation task. Unfortunately,
such configuration works incorrectly causing aggregation task to be run after every 'hourly'
task, despite the fact the aggregation task is set as downstream for 'task_for_hour-23' +only+:
>   !BrannchPythonOperator.png!
> Here's sample code:
> {code:java}
> # coding: utf-8
> from airflow import DAG
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.python_operator import BranchPythonOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.models import TriggerRule
> from datetime import datetime
> import logging
> dag_id = 'test'
> today = datetime.today().strftime("%Y-%m-%d");
> task_prefix = 'task_for_hour-'
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2018, 6, 18),
>     'catchup': False,
> }
> dag = DAG(
>     dag_id=dag_id,
>     default_args=default_args,
>     schedule_interval="@hourly",
>     catchup=False
> )
> # Setting the current hour
> def get_current_hour():
>     return datetime.now().hour
> # Returns the name id of the task to launch next (task_for_hour-0, task_for_hour-1, etc.)
> def branch():
>     return task_prefix + str(get_current_hour())
> # Running hourly job
> def run_hourly_job(**kwargs):
>     current_hour = get_current_hour()
>     logging.info("Running job for hour: %s" % current_hour)
> # Main daily aggregation
> def run_daily_aggregation(**kwargs):
>     logging.info("Running daily aggregation for %s" % today)
>     
> start_task = DummyOperator(
>     task_id='start',
>     dag=dag
> )
> # 'branch' method returns name of the task to be run next.
> hour_branching = BranchPythonOperator(
>     task_id='hour_branching',
>     python_callable=branch,
>     dag=dag)
> run_aggregation = PythonOperator(
>     task_id='daily_aggregation',
>     python_callable=run_daily_aggregation,
>     provide_context=True,
>     trigger_rule=TriggerRule.ALL_DONE,
>     dag=dag
> )
> start_task.set_downstream(hour_branching)
> # Create tasks for each hour
> for hour in range(24):
>     if hour == 23:
>         task_for_hour_23 = PythonOperator(
>             task_id=task_prefix + '23',
>             python_callable=run_hourly_job,
>             provide_context=True,
>             dag=dag
>         )
>         hour_branching.set_downstream(task_for_hour_23)
>         task_for_hour_23.set_downstream(run_aggregation)
>     else:
>         hour_branching.set_downstream(PythonOperator(
>             task_id=task_prefix + str(hour),
>             python_callable=run_hourly_job,
>             provide_context=True,
>             dag=dag)
>         )
> {code}
> This me be also related to AIRFLOW-1419



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message