airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Siddharth Anand (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-151) trigger_rule='one_success' not allowing tasks downstream of a BranchPythonOperator to be executed
Date Fri, 20 May 2016 04:43:12 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15292700#comment-15292700
] 

Siddharth Anand commented on AIRFLOW-151:
-----------------------------------------

Hi @william-clark,
Firstly, you need to create JIRAs, not ISSUES, since we are on Apache.

So, you need to think in terms of a diamond. The BranchPythonOperator is a and the join condition
one success is d below. b and c are the legs. You always need a leg. In some cases, you just
need to add a dummy operator in order to have a leg.

{panel}
  a
 /  \
b   c
 \   /
  d
{panel}

To give you an example, here's mine :

!DAG_Solution_Example.png!

In my example, I branch at check_for_time_to_build_model_branch_condition and join at wait_for_previous_hour.
I need to add a single prejoin_preagg_dummy_job, which is just a dummy operator, right after
the branch operator -- it is c in the diamond diagram above.

The reason is that the first task after a branch is ALWAYS skipped. Hence, the one_success
trigger rule is not evaluated. Just add a dummy operator between verify_vm and delete_vm tasks.
The dummy will be skipped when appropriate and the delete_vm will wait until one path succeeds.



> trigger_rule='one_success' not allowing tasks downstream of a BranchPythonOperator to
be executed
> -------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-151
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-151
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Siddharth Anand
>            Assignee: Siddharth Anand
>         Attachments: DAG_Problem.png, DAG_Solution_Example.png
>
>
> Porting from https://github.com/apache/incubator-airflow/issues/1521
> Dear Airflow Maintainers,
> *Environment*
> {panel}
> Airflow version: 1.7.0rc3
> Airflow components: webserver, scheduler, worker, postgres database, CeleryExecutor
> Relevant airflow.cfg settings: nothing special here; mostly defaults
> Python Version: 3.4.3
> Operating System: Centos 6.7
> Python packages: virtualenv with standard airflow install
> {panel}
> *Background*
> We are constructing a workflow to automate standard business processes around the creation
and maintenance of virtual machines. After creation, we verify several information points
on the VM to ensure that it is a viable machine and that no configuration errors occurred.
If it fails verification and is not running, then it should be deleted. If it fails verification
and is running, then we stop it first, then delete it.
> *What did you expect to happen?*
> After researching the BranchPythonOperator, I found that I should be using trigger_rule='one_success'
to allow a task at a join point downstream of the branch(es) to be triggered, as mentioned
in #1078. So, I defined the task as follows:
> {code}
> delete_vm = PythonOperator(
>      task_id='delete_vm',
>      trigger_rule=TriggerRule.ONE_SUCCESS,
>      python_callable=_delete_vm,
>      provide_context=True,
>      dag=dag)
> delete_vm.set_upstream({poll_vm_stop, verify_vm})
> {code}
> *What happened instead?*
> Rather than executing correctly, the delete_vm task is marked as skipped and is not re-evaluated
following poll_vm_stop. There is no stack trace available, as the task simply does not execute.
Sidenote: the PythonSensor you see in the picture below is a sensor which evaluates the truthy-
or falsey-ness of a Python callable. It has been tested extensively and works as intended.
> !DAG_Problem.png!
> Any help would be greatly appreciated. I've tested various ways of linking the dag, providing
DummyOperators as buffers, using a second BranchPythonOperator to explicitly call the task;
all of these have failed. Am I missing something obvious here?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message