airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Elliot <p...@mymusictaste.com>
Subject Re: Issue with Paused SubDAG
Date Mon, 21 Aug 2017 03:34:49 GMT
Quick update, I see a log message on the paused subdag tasks in question:

FIXME: Rescheduling due to concurrency limits reached at task runtime.
Attempt 1 of 2. State set to NONE.

I dunno if that helps or if there's something I can do about this.

Cheers,
P

On Thu, Aug 17, 2017 at 7:14 PM, Paul Elliot <paul@mymusictaste.com> wrote:

> Hi all,
>
> We've been using Airflow for the past year in our data operations. Been
> really happy with it so far!
>
> Environment: Airflow 1.8.0 with the LocalExecutor running on a single
> machine.
>
> Recently ran into an issue with a DAG containing one subdag operator with
> around 20 - 30 simple ETL subdags. Relevant code posted below. I poked
> around the mailing list archives but couldn't find anything related.
>
> When running the main DAG, things seem to work for the majority of the
> tasks. Then the DAG freezes: scheduler looks fine, main DAG is not paused,
> but several of the subdags never run. Their tasks are registered with the
> scheduler, and the UI shows the subdag operator task to be 'running'. The
> subdag tasks which aren't running have status 'null'. Clicking 'Task
> Instance Details' shows me a dependencies problem:
>
> Dag Not Paused Task's DAG 'ticket_gateways.ticket_gateways_subdag' is
> paused.
> For clarity, "ticket_gateways.ticket_gateways_subdag" is one of the
> paused subdags in question. Any reason why this happens, or what I can do
> to avoid, or even just, how do I unpause the subdags?
>
> Thanks,
> Paul
>
>
>
> # ... import stuff ...
>
> run_date = "{{ ds }}"
>
>
> def subdag_generator(parent_dag_name, child_dag_name, args):
>     dag_subdag = DAG(
>         dag_id='{}.{}'.format(parent_dag_name, child_dag_name),
>         default_args=args,
>         schedule_interval=None,
>     )
>
>     for tg, task, tables in get_ticket_gateways():
>         crawl_event_task = PythonOperator(
>             task_id='{}-{}_extract_event'.format(child_dag_name, tg),
>             default_args=args,
>             provide_context=True,
>             op_args=['Tickets.extractors', task],
>             templates_dict={'date': run_date},
>             op_kwargs={'mode': 'extract'},
>             trigger_rule='all_success',
>             python_callable=airflow_callable,
>             dag=dag_subdag)
>
>         process_event_task = PythonOperator(
>             task_id='{}-{}_process_event'.format(child_dag_name, tg),
>             default_args=args,
>             provide_context=True,
>             op_args=['Tickets.extractors', task],
>             op_kwargs={'mode': 'process'},
>             templates_dict={'date': run_date},
>             trigger_rule='all_success',
>             python_callable=airflow_callable,
>             dag=dag_subdag)
>
>         post_to_slack_task = SlackAPIPostOperator(
>             task_id='{}-{}_post_to_slack'.format(child_dag_name, tg),
>             default_args=args,
>             token=SLACK_TOKEN,
>             channel=SLACK_LOG_CHANNEL,
>             text='{} finished running for execution {}.'.format(tg,
> run_date),
>             dag=dag_subdag)
>
>         crawl_event_task.set_downstream(process_event_task)
>         process_event_task.set_downstream(post_to_slack_task)
>
>     return dag_subdag
>
>
> dag = DAG(
>     dag_id='ticket_gateways', default_args=DAG_DEFAULT_ARGS,
>     schedule_interval=get_dag_schedule()) # just '@weekly' or '@once'
>
> subdag_task = SubDagOperator(
>     task_id='ticket_gateways_subdag',
>     subdag=subdag_generator('ticket_gateways', 'ticket_gateways_subdag',
> DAG_DEFAULT_ARGS),
>     dag=dag
> )
>
> post_to_slack_task = SlackAPIPostOperator(
>     task_id='ticket_gateways_post_to_slack',
>     token=SLACK_TOKEN,
>     channel=SLACK_LOG_CHANNEL,
>     trigger_rule='all_done',
>     text='success',
>     dag=dag
> )
>
> notify_slack_task = SlackAPIPostOperator(
>     task_id='ticket_gateways_notify_slack',
>     token=SLACK_TOKEN,
>     channel=SLACK_NOTIFICATION_CHANNEL,
>     trigger_rule='one_failed',
>     text='failed',
>     dag=dag
> )
>
> subdag_task.set_downstream([post_to_slack_task, notify_slack_task])
>
>
> if __name__ == "__main__":
>     dag.cli()
>
> --
>
> *Paul Elliott | 엘리엇폴*
> Developer / Platform Dev. team
> paul@mymusictaste.com
> +82-10-2990-8642
>



-- 

*Paul Elliott | 엘리엇폴*
Developer / Platform Dev. team
paul@mymusictaste.com
+82-10-2990-8642

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message