airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Artem Kirillov (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AIRFLOW-74) SubdagOperators can consume all celeryd worker processes
Date Tue, 17 Jan 2017 16:19:26 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-74?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826323#comment-15826323
] 

Artem Kirillov edited comment on AIRFLOW-74 at 1/17/17 4:18 PM:
----------------------------------------------------------------

Guys, this bug breaks the whole conception of concurrency limitation support in Airflow. DAGs
are deadlocked all the time. Any chances to fix it?

It looks like the issue is caused by race condition in DAG.concurrency_reached() method.


was (Author: akirillov):
Guys, this bug breaks the whole conception of concurrency limitation support in Airflow. DAGs
are deadlocked all the time. Any chances to fix it?

> SubdagOperators can consume all celeryd worker processes
> --------------------------------------------------------
>
>                 Key: AIRFLOW-74
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-74
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery
>    Affects Versions: Airflow 1.7.1, Airflow 1.7.0, Airflow 1.6.2
>         Environment: Airflow 1.7.1rc3 with CeleryExecutor
> 1  webserver
> 1 scheduler
> 2 workers 
>            Reporter: Steven Yvinec-Kruyk
>
> If the amount of concurrent ```SubdagOperator``` running >= the no. of celery worker
processes tasks are unable to work. All SDOs come to a complete halt. Futhermore performance
of a DAG is drastically reduced even before full saturation of the workers as less workers
are gradually available for actual tasks. A workaround for this is to specify ```SequentialExecutor```
be used by the ```SubdagOperator```
> ```
> from datetime import timedelta, datetime
> from airflow.models import DAG, Pool
> from airflow.operators import BashOperator, SubDagOperator, DummyOperator
> from airflow.executors import SequentialExecutor
> import airflow
> # -----------------------------------------------------------------\
> # DEFINE THE POOLS
> # -----------------------------------------------------------------/
> session = airflow.settings.Session()
> for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
>     pool = (
>         session.query(Pool)
>         .filter(Pool.pool == p)
>         .first())
>     if not pool:
>         session.add(Pool(pool=p, slots=8))
>         session.commit()
> # -----------------------------------------------------------------\
> # DEFINE THE DAG
> # -----------------------------------------------------------------/
> # Define the Dag Name. This must be unique.
> dag_name = 'hanging_subdags_n16_sqe'
> # Default args are passed to each task
> default_args = {
>     'owner': 'Airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2016, 04, 10),
>     'retries': 0,
>     'retry_interval': timedelta(minutes=5),
>     'email': ['your@email.com'],
>     'email_on_failure': True,
>     'email_on_retry': True,
>     'wait_for_downstream': False,
> }
> # Create the dag object
> dag = DAG(dag_name,
>           default_args=default_args,
>           schedule_interval='0 0 * * *'
>           )
> # -----------------------------------------------------------------\
> # DEFINE THE TASKS
> # -----------------------------------------------------------------/
> def get_subdag(dag, sd_id, pool=None):
>     subdag = DAG(
>         dag_id='{parent_dag}.{sd_id}'.format(
>             parent_dag=dag.dag_id,
>             sd_id=sd_id),
>         params=dag.params,
>         default_args=dag.default_args,
>         template_searchpath=dag.template_searchpath,
>         user_defined_macros=dag.user_defined_macros,
>     )
>     t1 = BashOperator(
>         task_id='{sd_id}_step_1'.format(
>             sd_id=sd_id
>         ),
>         bash_command='echo "hello" && sleep 60',
>         dag=subdag,
>         pool=pool,
>         executor=SequentialExecutor
>     )
>     t2 = BashOperator(
>         task_id='{sd_id}_step_two'.format(
>             sd_id=sd_id
>         ),
>         bash_command='echo "hello" && sleep 15',
>         dag=subdag,
>         pool=pool,
>         executor=SequentialExecutor
>     )
>     t2.set_upstream(t1)
>     sdo = SubDagOperator(
>         task_id=sd_id,
>         subdag=subdag,
>         retries=0,
>         retry_delay=timedelta(seconds=5),
>         dag=dag,
>         depends_on_past=True,
>     )
>     return sdo
> start_task = DummyOperator(
>     task_id='start',
>     dag=dag
> )
> for n in range(1, 17):
>     sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
>     sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
>     sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')
>     sd_i.set_upstream(start_task)
>     sd_ii.set_upstream(sd_i)
>     sd_iii.set_upstream(sd_ii)
> ```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message