airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gerard Toonstra (JIRA)" <>
Subject [jira] [Commented] (AIRFLOW-584) Airflow Pool does not limit running tasks
Date Sun, 06 Nov 2016 21:22:58 GMT


Gerard Toonstra commented on AIRFLOW-584:

I'm writing up a detailed story on why the race condition exists and how specific settings
would influence it in AIRFLOW-72. It comes down to the way how pool availability is calculated.

This calculation revolves around checking the config setting and looking at the number of
task instances in RUNNING state that are using that pool. There is a time window between the
scheduler doing these calculations and the task instances being sent to the executor and actually
starting to RUN (which is when they are actually set into RUNNING state). 

In one scheduling loop, airflow wouldn't violate the rule. But if you have congestion on your
workers and your scheduler heartbeat is set low, then each loop could send N tasks to the
executor, because the ones that were sent to the executor wouldn't have updated their status
yet. In your case, there's apparently 25-30 seconds of congestion on your workers for some
reason in a specific time interval.

I don't know the reason for that. If you're running this DAG besides other dags on your system,
then maybe all workers are tied up and these task instances do not actually start on the workers
yet until after 25-30 seconds. Or it could be something related to the celery backend you're

On the up and coming airflow, this issue will already be somewhat mitigated, because each
task has a dependency check on the pool at the time of starting. This is also not 100% proof
against the race condition (simultaneously running tasks A and B could still read the state
at time X, thereby not seeing the update of either A or B at the time of reading), but it's
an improvement over the implementation, which doesn't have that check.

> Airflow Pool does not limit running tasks
> -----------------------------------------
>                 Key: AIRFLOW-584
>                 URL:
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: pools
>    Affects Versions: Airflow
>         Environment: Ubuntu 14.04
>            Reporter: David
>         Attachments: img1.png, img2.png
> Airflow pools are not limiting the number of running task instances for the following
dag in
> Steps to recreate:
> Create a pool of size 5 through the UI.
> The following dag has 52 tasks with increasing priority corresponding to the task number.
There should only ever be 5 tasks running at a time however I observed 29 'used slots' in
a pool with 5 slots
> {code}
> dag_name = 'pools_bug'
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2016, 10, 20),
>     'email_on_failure': False,
>     'retries': 1
> }
> dag = DAG(dag_name, default_args=default_args, schedule_interval="0 8 * * *")
> start = DummyOperator(task_id='start', dag=dag)
> end = DummyOperator(task_id='end', dag=dag)
> for i in range(50):
>     sleep_command = 'sleep 10'
>     task_name = 'task-{}'.format(i)
>     op = BashOperator(
>         task_id=task_name,
>         bash_command=sleep_command,
>         execution_timeout=timedelta(hours=4),
>         priority_weight=i,
>         pool=dag_name,
>         dag=dag)
>     start.set_downstream(op)
>     end.set_upstream(op)
> {code}
> Relevant configurations from airflow.cfg:
> {code}
> [core]
> # The executor class that airflow should use. Choices include
> # SequentialExecutor, LocalExecutor, CeleryExecutor
> executor = CeleryExecutor
> # The amount of parallelism as a setting to the executor. This defines
> # the max number of task instances that should run simultaneously
> # on this airflow installation
> parallelism = 64
> # The number of task instances allowed to run concurrently by the scheduler
> dag_concurrency = 64
> # The maximum number of active DAG runs per DAG
> max_active_runs_per_dag = 1
> [celery]
> # This section only applies if you are using the CeleryExecutor in
> # [core] section above
> # The app name that will be used by celery
> celery_app_name = airflow.executors.celery_executor
> # The concurrency that will be used when starting workers with the
> # "airflow worker" command. This defines the number of task instances that
> # a worker will take, so size up your workers based on the resources on
> # your worker box and the nature of your tasks
> celeryd_concurrency = 64
> [scheduler]
> # Task instances listen for external kill signal (when you clear tasks
> # from the CLI or the UI), this defines the frequency at which they should
> # listen (in seconds).
> job_heartbeat_sec = 5
> # The scheduler constantly tries to trigger new tasks (look at the
> # scheduler section in the docs for more information). This defines
> # how often the scheduler should run (in seconds).
> scheduler_heartbeat_sec = 5
> {code}
> !img1.png!
> !img2.png!

This message was sent by Atlassian JIRA

View raw message