airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Taylor Edmiston <tedmis...@gmail.com>
Subject Re: How to get airflow to add thousands of tasks to celery at one time?
Date Wed, 27 Jun 2018 21:39:02 GMT
It sounds like you may be getting bottlenecked by executor concurrency
settings.

Are you using default values for the other concurrency settings,
specifically the ones
<https://stackoverflow.com/questions/50737800/how-many-tasks-can-be-scheduled-in-a-single-airflow-dag/50743825#50743825>
mentioned here?  If you increase the other ones to be very high as well, do
you still experience the issue?

Taylor

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>


On Mon, Jun 25, 2018 at 1:40 PM, PAULI, KEVIN CHRISTIAN [AG/1000] <
kevin.christian.pauli@monsanto.com> wrote:

> Greetings Airflowers.  I'm evaluating Airflow 1.9.0 for our distributed
> orchestration needs (using CeleryExecutor and RabbitMQ), and I am seeing
> something strange.
>
> I made a dag that has three main stages: 1) start, 2) fan out and run N
> tasks concurrently, 3) finish.
>
> N can be large, maybe up to 10K.  I would expect to see N tasks get dumped
> onto the Rabbit queue when stage 2 begins.  Instead I am seeing only a few
> hundred added at a time.  As the workers process the tasks and the queue
> gets smaller, then more get added to Celery/Rabbit.  Eventually, it does
> finish, however I would really prefer that it dump ALL the work (all 10K
> tasks) into Celery immediately, for two reasons:
>
>
>   1.  The current way makes the scheduler long-lived and stateful.  The
> scheduler might die after only 5K have completed, in which case the
> remaining 5K tasks would never get added (I verified this)
>   2.  I want to use the size of the Rabbit queue as metric to trigger
> autoscaling events to add more workers.  So I need a true picture of how
> much outstanding work remains (10K, not a few hundred)
>
> I assume the scheduler has some kind of throttle that keeps it from
> dumping all 10K messages simultaneously?  If so is this configurable?
>
> FYI I have already set “parallelism” to 10K in the airflow.cfg
>
> Here is my test dag:
>
> # This dag tests how well airflow fans out
>
> from airflow import DAG
> from datetime import datetime, timedelta
>
> from airflow.operators.bash_operator import BashOperator
>
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2015, 6, 1),
>     'email': ['airflow@example.com<mailto:airflow@example.com>'],
>     'email_on_failure': False,
>     'email_on_retry': False,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=5),
> }
>
> dag = DAG('fan_out', default_args=default_args, schedule_interval=None)
>
> num_tasks = 1000
>
> starting = BashOperator(
>     task_id='starting',
>     bash_command='echo starting',
>     dag=dag
> )
>
> all_done = BashOperator(
>     task_id='all_done',
>     bash_command='echo all done',
>     dag=dag)
>
> for i in range(0, num_tasks):
>     task = BashOperator(
>         task_id='say_hello_' + str(i),
>         bash_command='echo hello world',
>         dag=dag)
>     task.set_upstream(starting)
>     task.set_downstream(all_done)
>
>
>
> --
> Regards,
> Kevin Pauli
>
> This email and any attachments were sent from a Monsanto email account and
> may contain confidential and/or privileged information. If you are not the
> intended recipient, please contact the sender and delete this email and any
> attachments immediately. Any unauthorized use, including disclosing,
> printing, storing, copying or distributing this email, is prohibited. All
> emails and attachments sent to or from Monsanto email accounts may be
> subject to monitoring, reading, and archiving by Monsanto, including its
> affiliates and subsidiaries, as permitted by applicable law. Thank you.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message