airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xiao Zhu (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AIRFLOW-2372) SubDAGs should share parallelism of parent DAG
Date Tue, 24 Apr 2018 22:36:00 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Xiao Zhu updated AIRFLOW-2372:
------------------------------
    Description: 
It seems like right now subDAGs are scheduled just like normal DAGs, so if a DAG has a lot
of (parallel) subDAGs with each having a lot of operators, triggering that DAG means those
subDAGs will gets triggered as normal DAGs, and they can easily take all the resources (limited
by dag_concurrency) of the scheduler, and other DAGs have to wait for those subDAGs.

For example, if I have this DAG, with a local scheduler and LocalExecutor, and parallelism
= 32, dag_concurrency = 16
{quote}
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

NUM_SUBDAGS = 20
NUM_OPS_PER_SUBDAG = 10


def logging_func(id):
  log.info("Now running id: {}".format(id))


def build_dag(dag_id, num_ops):
  dag = DAG(dag_id)

  start_op = DummyOperator(task_id='start', dag=dag)

  for i in range(num_ops):
    op = PythonOperator(
      task_id=str(i),
      python_callable=logging_func,
      op_args=[i],
      dag=dag
    )

    start_op >> op

  return dag

parent_id = 'consistent_failure'
with DAG(
  parent_id
) as dag:

  start_op = DummyOperator(task_id='start')

  for i in range(NUM_SUBDAGS):
    task_id = "subdag_{}".format(i)
    op = SubDagOperator(
      task_id=task_id,
      subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
    )

    start_op >> op
{quote}

When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the same time, and since
they don't share the parallelism with their parent DAG, each of them tries to run all their
operators in parallel at the same time too, which results in 500+ python processes created
by Airflow.

Ideally those subDAGs should share parallelism with their parent DAG (and thus with each other
too), so when I trigger this DAG, at any time only up to 32 operators, including the ones
in the subDAGs, are running.

  was:
It seems like right now subDAGs are scheduled just like normal DAGs, so if a DAG has a lot
of (parallel) subDAGs with each having a lot of operators, triggering that DAG means those
subDAGs will gets triggered as normal DAGs, and they can easily take all the resources (limited
by dag_concurrency) of the scheduler, and other DAGs have to wait for those subDAGs.

For example, if I have this DAG, with a local scheduler and LocalExecutor, and parallelism
= 32, dag_concurrency = 16
{quote}
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

NUM_SUBDAGS = 20
NUM_OPS_PER_SUBDAG = 10


def logging_func(id):
  log.info("Now running id: {}".format(id))


def build_dag(dag_id, num_ops):
  dag = DAG(dag_id)

  start_op = DummyOperator(task_id='start', dag=dag)

  for i in range(num_ops):
    op = PythonOperator(
      task_id=str(i),
      python_callable=logging_func,
      op_args=[i],
      dag=dag
    )

    start_op >> op

  return dag

parent_id = 'consistent_failure'
with DAG(
  parent_id
) as dag:

  start_op = DummyOperator(task_id='start')

  for i in range(NUM_SUBDAGS):
    task_id = "subdag_{}".format(i)
    op = SubDagOperator(
      task_id=task_id,
      subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
    )

    start_op >> op
{quote}

When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the same time, and since
they don't share the parallelism with their parent DAG, each of them tries to run their operators
in parallel, which results in 500+ python processes created by Airflow.

Ideally those subDAGs should share parallelism with their parent DAG (and thus with each other
too), so when I trigger this DAG, at any time only up to 32 operators, including the ones
in the subDAGs, are running.


> SubDAGs should share parallelism of parent DAG
> ----------------------------------------------
>
>                 Key: AIRFLOW-2372
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2372
>             Project: Apache Airflow
>          Issue Type: Wish
>    Affects Versions: Airflow 1.9.0
>         Environment: 1.9.0
> a local scheduler and LocalExecutor, and parallelism = 32, dag_concurrency = 16
>            Reporter: Xiao Zhu
>            Priority: Major
>
> It seems like right now subDAGs are scheduled just like normal DAGs, so if a DAG has
a lot of (parallel) subDAGs with each having a lot of operators, triggering that DAG means
those subDAGs will gets triggered as normal DAGs, and they can easily take all the resources
(limited by dag_concurrency) of the scheduler, and other DAGs have to wait for those subDAGs.
> For example, if I have this DAG, with a local scheduler and LocalExecutor, and parallelism
= 32, dag_concurrency = 16
> {quote}
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.subdag_operator import SubDagOperator
> NUM_SUBDAGS = 20
> NUM_OPS_PER_SUBDAG = 10
> def logging_func(id):
>   log.info("Now running id: {}".format(id))
> def build_dag(dag_id, num_ops):
>   dag = DAG(dag_id)
>   start_op = DummyOperator(task_id='start', dag=dag)
>   for i in range(num_ops):
>     op = PythonOperator(
>       task_id=str(i),
>       python_callable=logging_func,
>       op_args=[i],
>       dag=dag
>     )
>     start_op >> op
>   return dag
> parent_id = 'consistent_failure'
> with DAG(
>   parent_id
> ) as dag:
>   start_op = DummyOperator(task_id='start')
>   for i in range(NUM_SUBDAGS):
>     task_id = "subdag_{}".format(i)
>     op = SubDagOperator(
>       task_id=task_id,
>       subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
>     )
>     start_op >> op
> {quote}
> When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the same time,
and since they don't share the parallelism with their parent DAG, each of them tries to run
all their operators in parallel at the same time too, which results in 500+ python processes
created by Airflow.
> Ideally those subDAGs should share parallelism with their parent DAG (and thus with each
other too), so when I trigger this DAG, at any time only up to 32 operators, including the
ones in the subDAGs, are running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message