airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Conrad Lee (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AIRFLOW-1419) Trigger Rule not respected downstream of BranchPythonOperator
Date Thu, 15 Nov 2018 13:00:01 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Conrad Lee updated AIRFLOW-1419:
--------------------------------
    Description: 
Lets consider the following DAG:
{noformat}
              ____________________
             /                      \
branch_op                     confluence_op
             \______work_op________/

{noformat}
This is implemented in the following code:
{code:java}
import airflow
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import DAG

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='branch_skip_problem',
    default_args=args,
    schedule_interval="@daily")

branch_op = BranchPythonOperator(
    task_id='branch_op',
    python_callable=lambda: 'work_op',
    dag=dag)


work_op = DummyOperator(task_id='work_op', dag=dag)

confluence_op = DummyOperator(task_id='confluence_op', dag=dag, trigger_rule=TriggerRule.ALL_DONE)

branch_op.set_downstream(confluence_op)
branch_op.set_downstream(work_op)
work_op.set_downstream(confluence_op)

{code}
Note that branch_op is a BranchPythonOperator, work_op and confluence_op are DummyOperators,
and that confluence_op has its trigger_rule set to ALL_DONE.

In dag runs where brancher_op chooses to activate work_op as its child, confluence_op never
runs. This doesn't seem right, because confluence_op has two parents and a trigger_rule set
that it'll run as soon as all of its parents are done (whether or not they are skipped).

I know this example seems contrived and that in practice there are better ways of conditionally
executing work_op. However, this is the minimal code to illustrate the problem. You can imagine
that this problem might actually creep up in practice where originally there was a good reason
to use the BranchPythonOperator, and then time passes and someone modifies one of the branches
so that it doesn't really contain any children anymore, thus resembling the example.

  was:
Lets consider the following DAG:
{noformat}
              ____________________
             /                      \
branch_op                     confluence_op
             \______work_op________/

{noformat}

This is implemented in the following code:


{code}
import airflow
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import DAG

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='branch_skip_problem',
    default_args=args,
    schedule_interval="@daily")

branch_op = BranchPythonOperator(
    task_id='branch_op',
    python_callable=lambda: 'right_branch_op1',
    dag=dag)


work_op = DummyOperator(task_id='work_op', dag=dag)

confluence_op = DummyOperator(task_id='confluence_op', dag=dag, trigger_rule=TriggerRule.ALL_DONE)

branch_op.set_downstream(confluence_op)
branch_op.set_downstream(work_op)
work_op.set_downstream(confluence_op)

{code}


Note that branch_op is a BranchPythonOperator, work_op and confluence_op are DummyOperators,
and that confluence_op has its trigger_rule set to ALL_DONE.

In dag runs where brancher_op chooses to activate work_op as its child, confluence_op never
runs.  This doesn't seem right, because confluence_op has two parents and a trigger_rule set
that it'll run as soon as all of its parents are done (whether or not they are skipped).

I know this example seems contrived and that in practice there are better ways of conditionally
executing work_op.  However, this is the minimal code to illustrate the problem.  You can
imagine that this problem might actually creep up in practice where originally there was a
good reason to use the BranchPythonOperator, and then time passes and someone modifies one
of the branches so that it doesn't really contain any children anymore, thus resembling the
example.


> Trigger Rule not respected downstream of BranchPythonOperator
> -------------------------------------------------------------
>
>                 Key: AIRFLOW-1419
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1419
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.8.2
>            Reporter: Conrad Lee
>            Priority: Major
>
> Lets consider the following DAG:
> {noformat}
>               ____________________
>              /                      \
> branch_op                     confluence_op
>              \______work_op________/
> {noformat}
> This is implemented in the following code:
> {code:java}
> import airflow
> from airflow.operators.python_operator import BranchPythonOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.utils.trigger_rule import TriggerRule
> from airflow.models import DAG
> args = {
>     'owner': 'airflow',
>     'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
>     dag_id='branch_skip_problem',
>     default_args=args,
>     schedule_interval="@daily")
> branch_op = BranchPythonOperator(
>     task_id='branch_op',
>     python_callable=lambda: 'work_op',
>     dag=dag)
> work_op = DummyOperator(task_id='work_op', dag=dag)
> confluence_op = DummyOperator(task_id='confluence_op', dag=dag, trigger_rule=TriggerRule.ALL_DONE)
> branch_op.set_downstream(confluence_op)
> branch_op.set_downstream(work_op)
> work_op.set_downstream(confluence_op)
> {code}
> Note that branch_op is a BranchPythonOperator, work_op and confluence_op are DummyOperators,
and that confluence_op has its trigger_rule set to ALL_DONE.
> In dag runs where brancher_op chooses to activate work_op as its child, confluence_op
never runs. This doesn't seem right, because confluence_op has two parents and a trigger_rule
set that it'll run as soon as all of its parents are done (whether or not they are skipped).
> I know this example seems contrived and that in practice there are better ways of conditionally
executing work_op. However, this is the minimal code to illustrate the problem. You can imagine
that this problem might actually creep up in practice where originally there was a good reason
to use the BranchPythonOperator, and then time passes and someone modifies one of the branches
so that it doesn't really contain any children anymore, thus resembling the example.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message