airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dmytro Kulyk (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AIRFLOW-584) Airflow Pool does not limit running tasks
Date Thu, 07 Sep 2017 11:45:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156307#comment-16156307
] 

Dmytro Kulyk edited comment on AIRFLOW-584 at 9/7/17 11:44 AM:
---------------------------------------------------------------

Same situation appearing on 1.8.1 running on LocalExecutor with PostgreSQL backend when run
a backfill within a pool
*Case #1*
Config:
* parallelism = 16
* dag_concurrency = 8
* max_active_runs_per_dag = 16
* max_threads = 2

DAG: 
* max_active_runs = 4

Pool: 
* slots = 4

Result: 
* 10 task instances are running other ones are initially queued, but recently become "None"
with following message in task log
{code:java}
[2017-09-06 23:46:34,211] {base_task_runner.py:95} INFO - Subtask: [2017-09-06 23:46:34,211]
{models.py:1122} INFO - Dependencies not met for <TaskInstance: cube_update.update_bets_cube
2017-06-02 07:15:00 [queued]>, dependency 'Task Instance Slots Available' FAILED: The maximum
number of running tasks (cube_update) for this task's DAG '8' has been reached.
{code}

*Case # 2*
Config:
* same, but changed dag_concurrency = 16

Result:
* 16 task instances are running, all other are queues and waiting until completed 

So, as seen above, pool size of 4 is not respected

As soon given task is same PostgreSQL procedure updating same table such kind of parallelism
is impacting overall performance of dataflow. And we need (and tried) to limit concurrency
using pool or DAG settings.




was (Author: kotyara):
Same situation appearing on 1.8.1 running on LocalExecutor with PostgreSQL backend when run
a backfill within a pool
*Case #1*
Config:
* parallelism = 16
* dag_concurrency = 8
* max_active_runs_per_dag = 16
* max_threads = 2

DAG: 
* max_active_runs = 4

Pool: 
* slots = 4

Result: 
* 10 task instances are running other ones are initially queued, but recently become "None"
with following message in task log
{code:java}
[2017-09-06 23:46:34,211] {base_task_runner.py:95} INFO - Subtask: [2017-09-06 23:46:34,211]
{models.py:1122} INFO - Dependencies not met for <TaskInstance: cube_update.update_bets_cube
2017-06-02 07:15:00 [queued]>, dependency 'Task Instance Slots Available' FAILED: The maximum
number of running tasks (cube_update) for this task's DAG '8' has been reached.
{code}

*Case # 2*
Config:
* same, but changed dag_concurrency = 16

Result:
* 16 task instances are running, all other are queues and waiting until completed 

So pool size of 4 is not respected

> Airflow Pool does not limit running tasks
> -----------------------------------------
>
>                 Key: AIRFLOW-584
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-584
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: pools
>    Affects Versions: Airflow 1.7.1.3
>         Environment: Ubuntu 14.04
>            Reporter: David
>         Attachments: img1.png, img2.png
>
>
> Airflow pools are not limiting the number of running task instances for the following
dag in 1.7.1.3
> Steps to recreate:
> Create a pool of size 5 through the UI.
> The following dag has 52 tasks with increasing priority corresponding to the task number.
There should only ever be 5 tasks running at a time however I observed 29 'used slots' in
a pool with 5 slots
> {code}
> dag_name = 'pools_bug'
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2016, 10, 20),
>     'email_on_failure': False,
>     'retries': 1
> }
> dag = DAG(dag_name, default_args=default_args, schedule_interval="0 8 * * *")
> start = DummyOperator(task_id='start', dag=dag)
> end = DummyOperator(task_id='end', dag=dag)
> for i in range(50):
>     sleep_command = 'sleep 10'
>     task_name = 'task-{}'.format(i)
>     op = BashOperator(
>         task_id=task_name,
>         bash_command=sleep_command,
>         execution_timeout=timedelta(hours=4),
>         priority_weight=i,
>         pool=dag_name,
>         dag=dag)
>     start.set_downstream(op)
>     end.set_upstream(op)
> {code}
> Relevant configurations from airflow.cfg:
> {code}
> [core]
> # The executor class that airflow should use. Choices include
> # SequentialExecutor, LocalExecutor, CeleryExecutor
> executor = CeleryExecutor
> # The amount of parallelism as a setting to the executor. This defines
> # the max number of task instances that should run simultaneously
> # on this airflow installation
> parallelism = 64
> # The number of task instances allowed to run concurrently by the scheduler
> dag_concurrency = 64
> # The maximum number of active DAG runs per DAG
> max_active_runs_per_dag = 1
> [celery]
> # This section only applies if you are using the CeleryExecutor in
> # [core] section above
> # The app name that will be used by celery
> celery_app_name = airflow.executors.celery_executor
> # The concurrency that will be used when starting workers with the
> # "airflow worker" command. This defines the number of task instances that
> # a worker will take, so size up your workers based on the resources on
> # your worker box and the nature of your tasks
> celeryd_concurrency = 64
> [scheduler]
> # Task instances listen for external kill signal (when you clear tasks
> # from the CLI or the UI), this defines the frequency at which they should
> # listen (in seconds).
> job_heartbeat_sec = 5
> # The scheduler constantly tries to trigger new tasks (look at the
> # scheduler section in the docs for more information). This defines
> # how often the scheduler should run (in seconds).
> scheduler_heartbeat_sec = 5
> {code}
> !img1.png!
> !img2.png!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message