airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Elliot <p...@mymusictaste.com>
Subject Issue with Paused SubDAG
Date Thu, 17 Aug 2017 10:14:50 GMT
Hi all,

We've been using Airflow for the past year in our data operations. Been
really happy with it so far!

Environment: Airflow 1.8.0 with the LocalExecutor running on a single
machine.

Recently ran into an issue with a DAG containing one subdag operator with
around 20 - 30 simple ETL subdags. Relevant code posted below. I poked
around the mailing list archives but couldn't find anything related.

When running the main DAG, things seem to work for the majority of the
tasks. Then the DAG freezes: scheduler looks fine, main DAG is not paused,
but several of the subdags never run. Their tasks are registered with the
scheduler, and the UI shows the subdag operator task to be 'running'. The
subdag tasks which aren't running have status 'null'. Clicking 'Task
Instance Details' shows me a dependencies problem:

Dag Not Paused Task's DAG 'ticket_gateways.ticket_gateways_subdag' is
paused.
For clarity, "ticket_gateways.ticket_gateways_subdag" is one of the paused
subdags in question. Any reason why this happens, or what I can do to
avoid, or even just, how do I unpause the subdags?

Thanks,
Paul



# ... import stuff ...

run_date = "{{ ds }}"


def subdag_generator(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG(
        dag_id='{}.{}'.format(parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval=None,
    )

    for tg, task, tables in get_ticket_gateways():
        crawl_event_task = PythonOperator(
            task_id='{}-{}_extract_event'.format(child_dag_name, tg),
            default_args=args,
            provide_context=True,
            op_args=['Tickets.extractors', task],
            templates_dict={'date': run_date},
            op_kwargs={'mode': 'extract'},
            trigger_rule='all_success',
            python_callable=airflow_callable,
            dag=dag_subdag)

        process_event_task = PythonOperator(
            task_id='{}-{}_process_event'.format(child_dag_name, tg),
            default_args=args,
            provide_context=True,
            op_args=['Tickets.extractors', task],
            op_kwargs={'mode': 'process'},
            templates_dict={'date': run_date},
            trigger_rule='all_success',
            python_callable=airflow_callable,
            dag=dag_subdag)

        post_to_slack_task = SlackAPIPostOperator(
            task_id='{}-{}_post_to_slack'.format(child_dag_name, tg),
            default_args=args,
            token=SLACK_TOKEN,
            channel=SLACK_LOG_CHANNEL,
            text='{} finished running for execution {}.'.format(tg,
run_date),
            dag=dag_subdag)

        crawl_event_task.set_downstream(process_event_task)
        process_event_task.set_downstream(post_to_slack_task)

    return dag_subdag


dag = DAG(
    dag_id='ticket_gateways', default_args=DAG_DEFAULT_ARGS,
    schedule_interval=get_dag_schedule()) # just '@weekly' or '@once'

subdag_task = SubDagOperator(
    task_id='ticket_gateways_subdag',
    subdag=subdag_generator('ticket_gateways', 'ticket_gateways_subdag',
DAG_DEFAULT_ARGS),
    dag=dag
)

post_to_slack_task = SlackAPIPostOperator(
    task_id='ticket_gateways_post_to_slack',
    token=SLACK_TOKEN,
    channel=SLACK_LOG_CHANNEL,
    trigger_rule='all_done',
    text='success',
    dag=dag
)

notify_slack_task = SlackAPIPostOperator(
    task_id='ticket_gateways_notify_slack',
    token=SLACK_TOKEN,
    channel=SLACK_NOTIFICATION_CHANNEL,
    trigger_rule='one_failed',
    text='failed',
    dag=dag
)

subdag_task.set_downstream([post_to_slack_task, notify_slack_task])


if __name__ == "__main__":
    dag.cli()

-- 

*Paul Elliott | 엘리엇폴*
Developer / Platform Dev. team
paul@mymusictaste.com
+82-10-2990-8642

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message