airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shenghu Yang (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AIRFLOW-111) DAG concurrency is not honored
Date Wed, 08 Jun 2016 04:23:21 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15319568#comment-15319568
] 

Shenghu Yang edited comment on AIRFLOW-111 at 6/8/16 4:22 AM:
--------------------------------------------------------------

I think the bug is here:
https://github.com/apache/incubator-airflow/blob/18009d03311a0b29e14811865e0b13b19427b5e4/airflow/models.py#L1233-L1247

Let's see one task is already been 'QUEUED', and next time when we hit the above code, it
will NOT check the 'self.task.dag.concurrency_reached', hence it will run the task beyond
dag's concurrency.

One fix would be right after the above code, we add the following:
########################
         if self.task.dag.concurrency_reached:
                logging.warning('####dag {} concurrency {} reached'.format(self.job_id, self.task.dag.concurrency))
                return
         else:
                logging.warning('####dag {} concurrency {} NOT reached'.format(self.job_id,
self.task.dag.concurrency))
########################


was (Author: shenghu):
I think the bug is here:
https://github.com/apache/incubator-airflow/blob/18009d03311a0b29e14811865e0b13b19427b5e4/airflow/models.py#L1233-L1247

Let's see one task is already been 'QUEUED', and next time when we hit the above code, it
will NOT check the 'self.task.dag.concurrency_reached', hence it will run the task beyond
dag's concurrency.

One fix would be right after the above code, we add the following:
########################
if self.task.dag.concurrency_reached:
      logging.info("####dag {} concurrency {} reached".format(self.dag_id, self.concurrency))
       return
########################

> DAG concurrency is not honored
> ------------------------------
>
>                 Key: AIRFLOW-111
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-111
>             Project: Apache Airflow
>          Issue Type: Sub-task
>          Components: celery, scheduler
>    Affects Versions: Airflow 1.6.2, Airflow 1.7.1.2
>         Environment: Version of Airflow: 1.6.2
> Airflow configuration: Running a Scheduler with LocalExecutor
> Operating System: 3.13.0-74-generic #118-Ubuntu SMP Thu Dec 17 22:52:10 UTC 2015 x86_64
x86_64 x86_64 GNU/Linux
> Python Version: 2.7.6
> Screen shots of your DAG's status:
>            Reporter: Shenghu Yang
>             Fix For: Airflow 2.0
>
>
> Description of Issue
> In our dag, we set the dag_args['concurrency'] = 8, however, when the scheduler starts
to run, we can see this concurrency is not being honored, airflow scheduler will run up to
num of the 'parallelism' (we set as 25) jobs.
> What did you expect to happen?
> dag_args['concurrency'] = 8 is honored, e.g. only run at most 8 jobs concurrently.
> What happened instead?
> when the dag starts to run, we can see the concurrency is not being honored, airflow
scheduler/celery worker will run up to the 'parallelism' (we set as 25) jobs.
> Here is how you can reproduce this issue on your machine:
> create a dag which contains nothing but 25 parallelized jobs.
> set the dag dag_args['concurrency'] = 8
> set the airflow parallelism to 25
> then run: airflow scheduler
> you will see all 25 jobs are scheduled to run, not 8.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message