airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xiao Zhu (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AIRFLOW-2372) SubDAGs should share dag_concurrency of parent DAG
Date Tue, 24 Apr 2018 23:30:00 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Xiao Zhu updated AIRFLOW-2372:
------------------------------
    Description: 
It seems like right now subDAGs are scheduled just like normal DAGs, so if a DAG has a lot
of (parallel) subDAGs with each having a lot of operators, triggering that DAG means those
subDAGs will get triggered as normal DAGs, and they can easily take all the resources (limited
by dag_concurrency) of the scheduler, and other DAGs have to wait for those subDAGs.

For example, if I have this DAG, with a local scheduler and LocalExecutor, and parallelism
= 32, dag_concurrency = 16

{code:python}
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

NUM_SUBDAGS = 20
NUM_OPS_PER_SUBDAG = 10


def logging_func(id):
  log.info("Now running id: {}".format(id))


def build_dag(dag_id, num_ops):
  dag = DAG(dag_id)

  start_op = DummyOperator(task_id='start', dag=dag)

  for i in range(num_ops):
    op = PythonOperator(
      task_id=str(i),
      python_callable=logging_func,
      op_args=[i],
      dag=dag
    )

    start_op >> op

  return dag

parent_id = 'consistent_failure'
with DAG(
  parent_id
) as dag:

  start_op = DummyOperator(task_id='start')

  for i in range(NUM_SUBDAGS):
    task_id = "subdag_{}".format(i)
    op = SubDagOperator(
      task_id=task_id,
      subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
    )

    start_op >> op
{code}

When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the same time, and since
they don't share the dag_concurrency with their parent DAG, each of them tries to run all
their operators in parallel at the same time too, which results in 500+ python processes created
by Airflow.

Ideally those subDAGs should share dag_concurrency with their parent DAG (and thus with each
other too), so when I trigger this DAG, at any time only up to 16 operators, including the
ones in the subDAGs, are running.

  was:
It seems like right now subDAGs are scheduled just like normal DAGs, so if a DAG has a lot
of (parallel) subDAGs with each having a lot of operators, triggering that DAG means those
subDAGs will gets triggered as normal DAGs, and they can easily take all the resources (limited
by dag_concurrency) of the scheduler, and other DAGs have to wait for those subDAGs.

For example, if I have this DAG, with a local scheduler and LocalExecutor, and parallelism
= 32, dag_concurrency = 16

{code:python}
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

NUM_SUBDAGS = 20
NUM_OPS_PER_SUBDAG = 10


def logging_func(id):
  log.info("Now running id: {}".format(id))


def build_dag(dag_id, num_ops):
  dag = DAG(dag_id)

  start_op = DummyOperator(task_id='start', dag=dag)

  for i in range(num_ops):
    op = PythonOperator(
      task_id=str(i),
      python_callable=logging_func,
      op_args=[i],
      dag=dag
    )

    start_op >> op

  return dag

parent_id = 'consistent_failure'
with DAG(
  parent_id
) as dag:

  start_op = DummyOperator(task_id='start')

  for i in range(NUM_SUBDAGS):
    task_id = "subdag_{}".format(i)
    op = SubDagOperator(
      task_id=task_id,
      subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
    )

    start_op >> op
{code}

When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the same time, and since
they don't share the dag_concurrency with their parent DAG, each of them tries to run all
their operators in parallel at the same time too, which results in 500+ python processes created
by Airflow.

Ideally those subDAGs should share dag_concurrency with their parent DAG (and thus with each
other too), so when I trigger this DAG, at any time only up to 16 operators, including the
ones in the subDAGs, are running.


> SubDAGs should share dag_concurrency of parent DAG
> --------------------------------------------------
>
>                 Key: AIRFLOW-2372
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2372
>             Project: Apache Airflow
>          Issue Type: Wish
>    Affects Versions: Airflow 1.9.0
>         Environment: 1.9.0
> a local scheduler and LocalExecutor, and parallelism = 32, dag_concurrency = 16
>            Reporter: Xiao Zhu
>            Priority: Major
>
> It seems like right now subDAGs are scheduled just like normal DAGs, so if a DAG has
a lot of (parallel) subDAGs with each having a lot of operators, triggering that DAG means
those subDAGs will get triggered as normal DAGs, and they can easily take all the resources
(limited by dag_concurrency) of the scheduler, and other DAGs have to wait for those subDAGs.
> For example, if I have this DAG, with a local scheduler and LocalExecutor, and parallelism
= 32, dag_concurrency = 16
> {code:python}
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.subdag_operator import SubDagOperator
> NUM_SUBDAGS = 20
> NUM_OPS_PER_SUBDAG = 10
> def logging_func(id):
>   log.info("Now running id: {}".format(id))
> def build_dag(dag_id, num_ops):
>   dag = DAG(dag_id)
>   start_op = DummyOperator(task_id='start', dag=dag)
>   for i in range(num_ops):
>     op = PythonOperator(
>       task_id=str(i),
>       python_callable=logging_func,
>       op_args=[i],
>       dag=dag
>     )
>     start_op >> op
>   return dag
> parent_id = 'consistent_failure'
> with DAG(
>   parent_id
> ) as dag:
>   start_op = DummyOperator(task_id='start')
>   for i in range(NUM_SUBDAGS):
>     task_id = "subdag_{}".format(i)
>     op = SubDagOperator(
>       task_id=task_id,
>       subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
>     )
>     start_op >> op
> {code}
> When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the same time,
and since they don't share the dag_concurrency with their parent DAG, each of them tries to
run all their operators in parallel at the same time too, which results in 500+ python processes
created by Airflow.
> Ideally those subDAGs should share dag_concurrency with their parent DAG (and thus with
each other too), so when I trigger this DAG, at any time only up to 16 operators, including
the ones in the subDAGs, are running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message