airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bolke de Bruin <bdbr...@gmail.com>
Subject Re: Introducing a "LAUNCHED" state into airflow
Date Mon, 04 Dec 2017 20:10:33 GMT
As mentioned on the PR, for manually triggered dag runs this is fine. However, for backfilled
ones it is not as the actual backfill might be running. The backfill mechanism will fail the
dag run in that case.

So the issue I understand the the fix needs to be more comprehensive for backfills. 

Bolke

Verstuurd vanaf mijn iPad

> Op 4 dec. 2017 om 21:03 heeft Grant Nicholas <grantnicholas2015@u.northwestern.edu>
het volgende geschreven:
> 
> I opened a PR addressing the issue I found above, please take a look when
> you get a chance:
> 
> https://github.com/apache/incubator-airflow/pull/2843
> 
> I could not find a reason why we would want to exclude backfilled and
> externally triggered dagruns from the "reset orphaned task instances" check
> so I removed the special casing. (Anyone with expertise in this area,
> please let me know if there is a reason).
> This means that all dagruns should rightfully be crash safe again after
> this PR.
> 
> On Fri, Dec 1, 2017 at 11:57 AM, Maxime Beauchemin <
> maximebeauchemin@gmail.com> wrote:
> 
>> Oh Alex I didn't know there was a task instance audit feature already.
>> Curious to learn about it, do you have pointer or a short description?
>> 
>> Max
>> 
>> On Fri, Dec 1, 2017 at 9:26 AM, Grant Nicholas <
>> grantnicholas2015@u.northwestern.edu> wrote:
>> 
>>> @Alex just tested this and found the issue. It only resets the state of
>>> tasks for dagruns with* "external_trigger=False"*. You can see where this
>>> is explicitly specified in the query here:
>>> https://github.com/apache/incubator-airflow/blob/master/
>>> airflow/jobs.py#L242
>>> 
>>> Is this intended behavior? I can say for us, this caused confusion
>> because
>>> the dagruns created through the UI default to "external_trigger=True" and
>>> dagruns created through the CLI with "airflow trigger_dag {dag_id} -r
>>> {run_id}" also default to "external_trigger=True". This means that dags
>>> that are manually triggered are *NOT *crash safe by default.
>>> 
>>> Is there a reason why we would only want this behavior for
>>> "external_trigger=False" dags?
>>> 
>>> On Thu, Nov 30, 2017 at 1:18 PM, Alex Guziel <alex.guziel@airbnb.com.
>>> invalid
>>>> wrote:
>>> 
>>>> See reset_state_for_orphaned_tasks in jobs.py
>>>> 
>>>> On Thu, Nov 30, 2017 at 11:17 AM, Alex Guziel <alex.guziel@airbnb.com>
>>>> wrote:
>>>> 
>>>>> Right now the scheduler re-launches all QUEUED tasks on restart
>> (there
>>>> are
>>>>> safeguards for duplicates).
>>>>> 
>>>>> On Thu, Nov 30, 2017 at 11:13 AM, Grant Nicholas
>> <grantnicholas2015@u.
>>>>> northwestern.edu> wrote:
>>>>> 
>>>>>> @Alex
>>>>>> I agree setting the RUNNING state immediately when `airflow run`
>>> starts
>>>> up
>>>>>> would be useful on its own, but it doesn't solve all the problems.
>>> What
>>>>>> happens if you have a task in the QUEUED state (that may or may not
>>> have
>>>>>> been launched) and your scheduler crashes. What does the scheduler
>> do
>>> on
>>>>>> startup, does it launch all QUEUED tasks again (knowing that there
>> may
>>>> be
>>>>>> duplicate tasks) or does it not launch the QUEUED tasks again
>> (knowing
>>>>>> that
>>>>>> the task may be stuck in the QUEUED state forever). Right now,
>> airflow
>>>>>> does
>>>>>> the latter which I think is not correct, as you can potentially have
>>>> tasks
>>>>>> stuck in the QUEUED state forever.
>>>>>> 
>>>>>> Using a LAUNCHED state would explicitly keep track of whether tasks
>>> were
>>>>>> submitted for execution or not. At that point it's up to your
>>> messaging
>>>>>> system/queueing system/etc to be crash safe, and that is something
>> you
>>>> get
>>>>>> for free with kubernetes and it's something you can tune with celery
>>>>>> persistent queues.
>>>>>> 
>>>>>> Note: Another option is to NOT introduce a new state but have
>> airflow
>>>>>> launch QUEUED tasks again on startup of the executor. This would
>> mean
>>>> that
>>>>>> we may launch duplicate tasks, but this should not be an issue since
>>> we
>>>>>> have built in protections on worker startup to avoid having two
>>> RUNNING
>>>>>> task instances at once.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel <
>>>>>> alex.guziel@airbnb.com.invalid> wrote:
>>>>>> 
>>>>>>> I think the more sensible thing here is to just to set the state
>> to
>>>>>> RUNNING
>>>>>>> immediately in the airflow run process. I don't think the
>>> distinction
>>>>>>> between launched and running adds much value.
>>>>>>> 
>>>>>>> On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman <
>>>>>>> daniel.imberman@gmail.com
>>>>>>>> wrote:
>>>>>>> 
>>>>>>>> @Alex
>>>>>>>> 
>>>>>>>> That could potentially work since if you have the same task
>>> launched
>>>>>>> twice
>>>>>>>> then the second time would die due to the "already running
>>>>>> dependency".
>>>>>>>> Still less ideal than not launching that task at all since
it
>>> still
>>>>>>> allows
>>>>>>>> for race conditions. @grant thoughts on this?
>>>>>>>> 
>>>>>>>> On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel <
>>>> alex.guziel@airbnb.com.
>>>>>>>> invalid>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> It might be good enough to have RUNNING set immediately
on the
>>>>>> process
>>>>>>>> run
>>>>>>>>> and not being dependent on the dag file being parsed.
It is
>>>> annoying
>>>>>>> here
>>>>>>>>> too when dags parse on the scheduler but not the worker,
since
>>>>>> queued
>>>>>>>> tasks
>>>>>>>>> that don't heartbeat will not get retried, while running
tasks
>>>> will.
>>>>>>>>> 
>>>>>>>>> On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas <
>>>>>>>>> grantnicholas2015@u.northwestern.edu> wrote:
>>>>>>>>> 
>>>>>>>>>> ---Opening up this conversation to the whole mailing
list,
>> as
>>>>>>> suggested
>>>>>>>>> by
>>>>>>>>>> Bolke---
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> A "launched" state has been suggested in the past
(see here
>>>>>>>>>> <https://github.com/apache/incubator-airflow/blob/master/
>>>>>>>>>> airflow/utils/state.py#L31>)
>>>>>>>>>> but never implemented for reasons unknown to us.
Does anyone
>>>> have
>>>>>>> more
>>>>>>>>>> details about why?
>>>>>>>>>> 
>>>>>>>>>> There are two big reasons why adding a new "launched"
state
>> to
>>>>>>> airflow
>>>>>>>>>> would be useful:
>>>>>>>>>> 
>>>>>>>>>> 1. A "launched" state would be useful for crash safety
of
>> the
>>>>>>>> scheduler.
>>>>>>>>> If
>>>>>>>>>> the scheduler crashes in between the scheduler launching
the
>>>> task
>>>>>> and
>>>>>>>> the
>>>>>>>>>> task process starting up then we lose information
about
>>> whether
>>>>>> that
>>>>>>>> task
>>>>>>>>>> was launched or not. By moving the state of the task
to
>>>> "launched"
>>>>>>> when
>>>>>>>>> it
>>>>>>>>>> is sent off to celery/dask/kubernetes/etc, when crashes
>> happen
>>>> you
>>>>>>> know
>>>>>>>>>> whether you have to relaunch the task or not.
>>>>>>>>>> 
>>>>>>>>>> To workaround this issue, on startup of the kubernetes
>>> executor
>>>> we
>>>>>>>> query
>>>>>>>>>> all "queued" tasks and if there is not a matching
kubernetes
>>> pod
>>>>>> for
>>>>>>>> that
>>>>>>>>>> task then we set the task state to "None" so it is
>>> rescheduled.
>>>>>> See
>>>>>>>> here
>>>>>>>>>> <https://github.com/bloomberg/airflow/blob/airflow-
>>>>>>>>>> 
>>>>>>>>> kubernetes-executor/airflow/contrib/executors/kubernetes_
>>>>>>>> executor.py#L400>
>>>>>>>>>> for
>>>>>>>>>> details if you are curious. While this works for
the
>>> kubernetes
>>>>>>>> executor,
>>>>>>>>>> other executors can't easily introspect launched
tasks and
>>> this
>>>>>> means
>>>>>>>> the
>>>>>>>>>> celery executor (afaik) is not crash safe.
>>>>>>>>>> 
>>>>>>>>>> 2. A "launched" state would allow for dynamic backpressure
>> of
>>>>>> tasks,
>>>>>>>> not
>>>>>>>>>> just static backpressure. Right now, airflow only
allows
>>> static
>>>>>>>>>> backpressure (`parallelism` config).This means you
must
>>>> statically
>>>>>>> say
>>>>>>>> I
>>>>>>>>>> only want to allow N running tasks at once. Imagine
you have
>>>> lots
>>>>>> of
>>>>>>>>> tasks
>>>>>>>>>> being scheduled on your celery cluster/kubernetes
cluster
>> and
>>>>>> since
>>>>>>> the
>>>>>>>>>> resource usage of each task is heterogenous you don't
know
>>>> exactly
>>>>>>> how
>>>>>>>>> many
>>>>>>>>>> running tasks you can tolerate at once. If instead
you can
>> say
>>>> "I
>>>>>>> only
>>>>>>>>> want
>>>>>>>>>> tasks to be launched while I have less than N tasks
in the
>>>>>> launched
>>>>>>>>> state"
>>>>>>>>>> you get some adaptive backpressure.
>>>>>>>>>> 
>>>>>>>>>> While we have workarounds described above for the
kubernetes
>>>>>>> executor,
>>>>>>>>> how
>>>>>>>>>> do people feel about introducing a launched state
into
>> airflow
>>>> so
>>>>>> we
>>>>>>>>> don't
>>>>>>>>>> need the workarounds? I think there are benefits
to be
>> gained
>>>> for
>>>>>> all
>>>>>>>> the
>>>>>>>>>> executors.
>>>>>>>>>> 
>>>>>>>>>> On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin <
>>>>>> bdbruin@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Hi Daniel,
>>>>>>>>>>> 
>>>>>>>>>>> (BTW: I do think this discussion is better to
have at the
>>>>>>>> mailinglist,
>>>>>>>>>>> more people might want to chime in and offer
valuable
>>>> opinions)
>>>>>>>>>>> 
>>>>>>>>>>> Jumping right in: I am wondering if are you not
>> duplicating
>>>> the
>>>>>>>>> “queued”
>>>>>>>>>>> logic for (a.o) pools. Introducing LAUNCHED with
the
>> meaning
>>>>>>> attached
>>>>>>>>> to
>>>>>>>>>>> it that you describe, would mean that we have
a second
>> place
>>>>>> where
>>>>>>> we
>>>>>>>>>>> handle back pressure.
>>>>>>>>>>> 
>>>>>>>>>>> Isn’t there a way to ask the k8s cluster how
many tasks it
>>> has
>>>>>>>> pending
>>>>>>>>>> and
>>>>>>>>>>> just to execute any queued tasks when it crosses
a certain
>>>>>>> threshold?
>>>>>>>>>> Have
>>>>>>>>>>> a look a base_executor where it is handling slots
and
>> queued
>>>>>> tasks.
>>>>>>>>>>> 
>>>>>>>>>>> Cheers
>>>>>>>>>>> Bolke
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Verstuurd vanaf mijn iPad
>>>>>>>>>>> 
>>>>>>>>>>> Op 15 nov. 2017 om 01:39 heeft Daniel Imberman
<
>>>>>>>>>> daniel.imberman@gmail.com>
>>>>>>>>>>> het volgende geschreven:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Bolke and Dan!
>>>>>>>>>>> 
>>>>>>>>>>> I had a quick question WRT the launched state
(
>>>>>>>>>>> https://github.com/apache/incubator-airflow/blob/master/
>> air
>>>>>>>>>>> flow/utils/state.py#L32).
>>>>>>>>>>> 
>>>>>>>>>>> We are handling the issue of throttling the executor
when
>>> the
>>>>>> k8s
>>>>>>>>> cluster
>>>>>>>>>>> has more than 5 pending tasks (which usually
means that
>> the
>>>>>> cluster
>>>>>>>> is
>>>>>>>>>>> under a lot of strain), and one thought we had
WRT crash
>>>> safety
>>>>>> was
>>>>>>>> to
>>>>>>>>>> use
>>>>>>>>>>> a "LAUNCHED" state for pods that have been submitted
but
>> are
>>>> not
>>>>>>>>> running
>>>>>>>>>>> yet.
>>>>>>>>>>> 
>>>>>>>>>>> With the launched state currently being TBD,
I was
>> wondering
>>>> if
>>>>>>> there
>>>>>>>>> was
>>>>>>>>>>> any reason you guys would not want this state?
There are
>>> other
>>>>>>>>>> workarounds
>>>>>>>>>>> we can do, but we wanted to check in with you
guys first.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks!
>>>>>>>>>>> 
>>>>>>>>>>> Daniel
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Mime
View raw message