airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Riccomini (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-267) PythonOperators return value seems to be ignored
Date Mon, 27 Jun 2016 15:51:52 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15351290#comment-15351290
] 

Chris Riccomini commented on AIRFLOW-267:
-----------------------------------------

Returning false does not halt the execution of a DAG. Returned values are stored as XCom fields.
From [XCom docs|http://airflow.incubator.apache.org/concepts.html#xcoms]:

{quote}
In addition, if a task returns a value (either from its Operator’s execute() method, or
from a PythonOperator’s python_callable function), then an XCom containing that value is
automatically pushed.
{quote}

You should have a look at the BranchOperator to circumvent logic that you don't want to run.

> PythonOperators return value seems to be ignored
> ------------------------------------------------
>
>                 Key: AIRFLOW-267
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-267
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: operators
>    Affects Versions: Airflow 1.7.0
>            Reporter: Eric Johnson
>            Priority: Minor
>
> This is a simple dag that I don't think should run to completion as there as a PythonOperator
that returns False which should halt the dependency. Right?
> morning depends on check_false which depends on check_true.
> check_true's PythonOperator returns true. So that's fine. But check_false is a PythonOperator
that returns False. Shouldn't that halt the execution?
> {code}
> from builtins import range
> from airflow.operators import BashOperator, DummyOperator, TimeSensor, PythonOperator
> from airflow.models import DAG
> from datetime import datetime, timedelta, time
> one_day_ago = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
> args = {
>     'owner': 'ejohnson',
>     'start_date' : one_day_ago,
>     'email' : "ejohnson@example.com",
>     'email_on_failure' : True
> }
> # This is the master container for the mydag
> mydag = DAG(
>     dag_id='mydag',
>     default_args=args,
>     schedule_interval=None
> )
> def check_func_true(ds, **kwargs):
>     return True
> def check_func_false(ds, **kwargs):
>     return False
> check_false = PythonOperator(
>     task_id='check_false',
>     provide_context=True,
>     python_callable=check_func_false,
>     email="ejohnson@example.com",
>     email_on_retry=True,
>     email_on_failure=True,
>     retries=5,
>     dag=mydag)
> check_true = PythonOperator(
>     task_id='check_true',
>     provide_context=True,
>     python_callable=check_func_true,
>     email="ejohnson@example.com",
>     email_on_retry=True,
>     email_on_failure=True,
>     retries=5,
>     dag=mydag)
> morning = BashOperator(task_id='morning',bash_command="echo morning",dag=mydag)
> morning.set_upstream(check_false)
> check_false.set_upstream(check_true)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message