airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF subversion and git services (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-1631) LocalExecutor does not maintain contract of unbound parallelism (0 value)
Date Tue, 17 Oct 2017 18:40:01 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208134#comment-16208134
] 

ASF subversion and git services commented on AIRFLOW-1631:
----------------------------------------------------------

Commit cdfced3248c7f14b639919c093f4f3042deb754b in incubator-airflow's branch refs/heads/master
from [~erod]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=cdfced3 ]

[AIRFLOW-1631] Fix local executor unbound parallelism

Before, if unlimited parallelism was used passing
`0` for the
parallelism value, the local executor would stall
execution since no
worker was being created, violating the
BaseExecutor contract on the
parallelism option.

Now, if unbound parallelism is used, processes
will be created on demand
for each task submitted for execution.

Closes #2658 from edgarRd/erod-localexecutor-fix


> LocalExecutor does not maintain contract of unbound parallelism (0 value)
> -------------------------------------------------------------------------
>
>                 Key: AIRFLOW-1631
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1631
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: executor
>    Affects Versions: 1.8.1
>            Reporter: Edgar Rodriguez
>            Assignee: Edgar Rodriguez
>
> *Location*
> {{airflow/executors/local_executor.py:LocalExecutor#start}}:
> {code}
> def start(self):
>     self.queue = multiprocessing.JoinableQueue()
>     self.result_queue = multiprocessing.Queue()
>     self.workers = [
>         LocalWorker(self.queue, self.result_queue)
>         for _ in range(self.parallelism)
>     ]
>     for w in self.workers:
>         w.start()
> {code}
> *Description*
> When *{{PARALLELISM}}* configuration value is set to {{0}}, using local executor will
stall computation since it won't create any workers. As described in base_executor:
> {code}
> :param parallelism: how many jobs should run at one time. Set to
>             ``0`` for infinity
> :type parallelism: int
> {code}
> Hence, this contract is not maintained in {{LocalExecutor}}.
> *Remediation*
> In the context of local executor, in theory if parallelism is unbounded, then every task
submitted should run immediately in its own process, so we could spawn a process for each
submitted task without using a worker queue. When the task is completed, the worker can be
terminated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message