airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xiao Zhu (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AIRFLOW-2372) SubDAGs should share parallelism of parent DAG
Date Tue, 24 Apr 2018 22:35:00 GMT
Xiao Zhu created AIRFLOW-2372:
---------------------------------

             Summary: SubDAGs should share parallelism of parent DAG
                 Key: AIRFLOW-2372
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2372
             Project: Apache Airflow
          Issue Type: Wish
    Affects Versions: Airflow 1.9.0
         Environment: 1.9.0
a local scheduler and LocalExecutor, and parallelism = 32, dag_concurrency = 16
            Reporter: Xiao Zhu


It seems like right now subDAGs are scheduled just like normal DAGs, so if a DAG has a lot
of (parallel) subDAGs with each having a lot of operators, triggering that DAG means those
subDAGs will gets triggered as normal DAGs, and they can easily take all the resources (limited
by dag_concurrency) of the scheduler, and other DAGs have to wait for those subDAGs.

For example, if I have this DAG, with a local scheduler and LocalExecutor, and parallelism
= 32, dag_concurrency = 16
{quote}
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

NUM_SUBDAGS = 20
NUM_OPS_PER_SUBDAG = 10


def logging_func(id):
  log.info("Now running id: {}".format(id))


def build_dag(dag_id, num_ops):
  dag = DAG(dag_id)

  start_op = DummyOperator(task_id='start', dag=dag)

  for i in range(num_ops):
    op = PythonOperator(
      task_id=str(i),
      python_callable=logging_func,
      op_args=[i],
      dag=dag
    )

    start_op >> op

  return dag

parent_id = 'consistent_failure'
with DAG(
  parent_id
) as dag:

  start_op = DummyOperator(task_id='start')

  for i in range(NUM_SUBDAGS):
    task_id = "subdag_{}".format(i)
    op = SubDagOperator(
      task_id=task_id,
      subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
    )

    start_op >> op
{quote}

When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the same time, and since
they don't share the parallelism with their parent DAG, each of them tries to run their operators
in parallel, which results in 500+ python processes created by Airflow.

Ideally those subDAGs should share parallelism with their parent DAG (and thus with each other
too), so when I trigger this DAG, at any time only up to 32 operators, including the ones
in the subDAGs, are running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message