airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matt Inwood (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AIRFLOW-894) Trigger Rules not functioning
Date Thu, 23 Feb 2017 16:00:47 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Matt Inwood updated AIRFLOW-894:
--------------------------------
    Description: 
Code below fails to schedule the join task. This includes with trigger rules for all_done,
and one_success. It seems to only occur when dynamically generating tasks. 

from airflow import DAG
from airflow.operators import PythonOperator, BranchPythonOperator, DummyOperator
from datetime import datetime, timedelta
from datetime import datetime
from slackclient import SlackClient


default_args = {
    'owner': 'analytics',
    'depends_on_past': False,
    #'start_date': sixty_days_ago,
    'start_date': datetime(2017, 2, 22),
    'retries': 0
    # 'retry_delay': timedelta(seconds=30),
}

dag = DAG(
    'Valet_Data',
    default_args=default_args,
    schedule_interval='*/5 * * * *',
    dagrun_timeout=timedelta(seconds=60))

def valet_function(locdata, ds, **kwargs):
    if locdata == 'D':
        print(intentionalFail)

join = DummyOperator(
    task_id='join',
    trigger_rule='all_done',
    dag=dag
)


list = ['A','B','C','D','E','F','G','H','I','J','Z']

for l in list:

    task = PythonOperator(
        task_id='{0}_PANTS'.format(l),
        provide_context=True,
        python_callable=valet_function,
        op_kwargs={'locdata': l},
        # on_failure_callback=on_failure,
        # on_success_callback=on_success,
        dag=dag,
    )

  was:
Code below fails to schedule the join task. This includes with trigger rules for all_done,
and one_success. It seems to only occur when dynamically generating tasks. 

from airflow import DAG
from airflow.operators import PythonOperator, BranchPythonOperator, DummyOperator
from datetime import datetime, timedelta
from datetime import datetime
from slackclient import SlackClient


default_args = {
    'owner': 'analytics',
    'depends_on_past': False,
    #'start_date': sixty_days_ago,
    'start_date': datetime(2017, 2, 22),
    'retries': 0
    # 'retry_delay': timedelta(seconds=30),
}

dag = DAG(
    'Valet_Data',
    default_args=default_args,
    schedule_interval='*/5 * * * *',
    dagrun_timeout=timedelta(seconds=60))

def valet_function(locdata, ds, **kwargs):
    if locdata == 'D':
        print(intentionalFail)

join = DummyOperator(
    task_id='join',
    trigger_rule='all_done',
    dag=dag
)


list = ['A','B','C','D','E','F','G','H','I','J','Z']

for l in list:

    task = PythonOperator(
        task_id='{0}_PANTS'.format(l),
        provide_context=True,
        python_callable=valet_function,
        op_kwargs={'locdata': l},
        dag=dag,
    )


> Trigger Rules not functioning
> -----------------------------
>
>                 Key: AIRFLOW-894
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-894
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: dependencies
>    Affects Versions: Airflow 1.7.1.3
>            Reporter: Matt Inwood
>            Priority: Blocker
>         Attachments: airflow_error.PNG
>
>
> Code below fails to schedule the join task. This includes with trigger rules for all_done,
and one_success. It seems to only occur when dynamically generating tasks. 
> from airflow import DAG
> from airflow.operators import PythonOperator, BranchPythonOperator, DummyOperator
> from datetime import datetime, timedelta
> from datetime import datetime
> from slackclient import SlackClient
> default_args = {
>     'owner': 'analytics',
>     'depends_on_past': False,
>     #'start_date': sixty_days_ago,
>     'start_date': datetime(2017, 2, 22),
>     'retries': 0
>     # 'retry_delay': timedelta(seconds=30),
> }
> dag = DAG(
>     'Valet_Data',
>     default_args=default_args,
>     schedule_interval='*/5 * * * *',
>     dagrun_timeout=timedelta(seconds=60))
> def valet_function(locdata, ds, **kwargs):
>     if locdata == 'D':
>         print(intentionalFail)
> join = DummyOperator(
>     task_id='join',
>     trigger_rule='all_done',
>     dag=dag
> )
> list = ['A','B','C','D','E','F','G','H','I','J','Z']
> for l in list:
>     task = PythonOperator(
>         task_id='{0}_PANTS'.format(l),
>         provide_context=True,
>         python_callable=valet_function,
>         op_kwargs={'locdata': l},
>         # on_failure_callback=on_failure,
>         # on_success_callback=on_success,
>         dag=dag,
>     )



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message