airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Van Klaveren, Brian N." <>
Subject Re: Introducing a "LAUNCHED" state into airflow
Date Thu, 30 Nov 2017 19:01:33 GMT
It can potentially add value, at least to the user. The user knows they may be undersubscribed
on workers, whether those are k8s or something else.

You'll want to think about how execution_timeout relates to this, right?

In a different system I work on, we have:
READY, QUEUED, SUBMITTED, and RUNNING states, where the SUBMITTED state is similarly only
useful for the executors that have their own queue (like traditional batch). Our execution
timeout only applies to the RUNNING state. We also log the transition times for those states,
which can also be useful for analytical purposes.


> On Nov 30, 2017, at 10:40 AM, Alex Guziel <> wrote:
> I think the more sensible thing here is to just to set the state to RUNNING
> immediately in the airflow run process. I don't think the distinction
> between launched and running adds much value.
> On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman <
>> wrote:
>> @Alex
>> That could potentially work since if you have the same task launched twice
>> then the second time would die due to the "already running dependency".
>> Still less ideal than not launching that task at all since it still allows
>> for race conditions. @grant thoughts on this?
>> On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel <
>> invalid>
>> wrote:
>>> It might be good enough to have RUNNING set immediately on the process
>> run
>>> and not being dependent on the dag file being parsed. It is annoying here
>>> too when dags parse on the scheduler but not the worker, since queued
>> tasks
>>> that don't heartbeat will not get retried, while running tasks will.
>>> On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas <
>>>> wrote:
>>>> ---Opening up this conversation to the whole mailing list, as suggested
>>> by
>>>> Bolke---
>>>> A "launched" state has been suggested in the past (see here
>>>> <
>>>> airflow/utils/>)
>>>> but never implemented for reasons unknown to us. Does anyone have more
>>>> details about why?
>>>> There are two big reasons why adding a new "launched" state to airflow
>>>> would be useful:
>>>> 1. A "launched" state would be useful for crash safety of the
>> scheduler.
>>> If
>>>> the scheduler crashes in between the scheduler launching the task and
>> the
>>>> task process starting up then we lose information about whether that
>> task
>>>> was launched or not. By moving the state of the task to "launched" when
>>> it
>>>> is sent off to celery/dask/kubernetes/etc, when crashes happen you know
>>>> whether you have to relaunch the task or not.
>>>> To workaround this issue, on startup of the kubernetes executor we
>> query
>>>> all "queued" tasks and if there is not a matching kubernetes pod for
>> that
>>>> task then we set the task state to "None" so it is rescheduled. See
>> here
>>>> <
>>> kubernetes-executor/airflow/contrib/executors/kubernetes_
>>>> for
>>>> details if you are curious. While this works for the kubernetes
>> executor,
>>>> other executors can't easily introspect launched tasks and this means
>> the
>>>> celery executor (afaik) is not crash safe.
>>>> 2. A "launched" state would allow for dynamic backpressure of tasks,
>> not
>>>> just static backpressure. Right now, airflow only allows static
>>>> backpressure (`parallelism` config).This means you must statically say
>> I
>>>> only want to allow N running tasks at once. Imagine you have lots of
>>> tasks
>>>> being scheduled on your celery cluster/kubernetes cluster and since the
>>>> resource usage of each task is heterogenous you don't know exactly how
>>> many
>>>> running tasks you can tolerate at once. If instead you can say "I only
>>> want
>>>> tasks to be launched while I have less than N tasks in the launched
>>> state"
>>>> you get some adaptive backpressure.
>>>> While we have workarounds described above for the kubernetes executor,
>>> how
>>>> do people feel about introducing a launched state into airflow so we
>>> don't
>>>> need the workarounds? I think there are benefits to be gained for all
>> the
>>>> executors.
>>>> On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin <>
>>> wrote:
>>>>> Hi Daniel,
>>>>> (BTW: I do think this discussion is better to have at the
>> mailinglist,
>>>>> more people might want to chime in and offer valuable opinions)
>>>>> Jumping right in: I am wondering if are you not duplicating the
>>> “queued”
>>>>> logic for (a.o) pools. Introducing LAUNCHED with the meaning attached
>>> to
>>>>> it that you describe, would mean that we have a second place where we
>>>>> handle back pressure.
>>>>> Isn’t there a way to ask the k8s cluster how many tasks it has
>> pending
>>>> and
>>>>> just to execute any queued tasks when it crosses a certain threshold?
>>>> Have
>>>>> a look a base_executor where it is handling slots and queued tasks.
>>>>> Cheers
>>>>> Bolke
>>>>> Verstuurd vanaf mijn iPad
>>>>> Op 15 nov. 2017 om 01:39 heeft Daniel Imberman <
>>>>> het volgende geschreven:
>>>>> Hi Bolke and Dan!
>>>>> I had a quick question WRT the launched state (
>>>>> flow/utils/
>>>>> We are handling the issue of throttling the executor when the k8s
>>> cluster
>>>>> has more than 5 pending tasks (which usually means that the cluster
>> is
>>>>> under a lot of strain), and one thought we had WRT crash safety was
>> to
>>>> use
>>>>> a "LAUNCHED" state for pods that have been submitted but are not
>>> running
>>>>> yet.
>>>>> With the launched state currently being TBD, I was wondering if there
>>> was
>>>>> any reason you guys would not want this state? There are other
>>>> workarounds
>>>>> we can do, but we wanted to check in with you guys first.
>>>>> Thanks!
>>>>> Daniel

View raw message