airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Grant Nicholas <grantnicholas2...@u.northwestern.edu>
Subject Re: Introducing a "LAUNCHED" state into airflow
Date Thu, 30 Nov 2017 19:13:05 GMT
@Alex
I agree setting the RUNNING state immediately when `airflow run` starts up
would be useful on its own, but it doesn't solve all the problems. What
happens if you have a task in the QUEUED state (that may or may not have
been launched) and your scheduler crashes. What does the scheduler do on
startup, does it launch all QUEUED tasks again (knowing that there may be
duplicate tasks) or does it not launch the QUEUED tasks again (knowing that
the task may be stuck in the QUEUED state forever). Right now, airflow does
the latter which I think is not correct, as you can potentially have tasks
stuck in the QUEUED state forever.

Using a LAUNCHED state would explicitly keep track of whether tasks were
submitted for execution or not. At that point it's up to your messaging
system/queueing system/etc to be crash safe, and that is something you get
for free with kubernetes and it's something you can tune with celery
persistent queues.

Note: Another option is to NOT introduce a new state but have airflow
launch QUEUED tasks again on startup of the executor. This would mean that
we may launch duplicate tasks, but this should not be an issue since we
have built in protections on worker startup to avoid having two RUNNING
task instances at once.



On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel <
alex.guziel@airbnb.com.invalid> wrote:

> I think the more sensible thing here is to just to set the state to RUNNING
> immediately in the airflow run process. I don't think the distinction
> between launched and running adds much value.
>
> On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman <
> daniel.imberman@gmail.com
> > wrote:
>
> > @Alex
> >
> > That could potentially work since if you have the same task launched
> twice
> > then the second time would die due to the "already running dependency".
> > Still less ideal than not launching that task at all since it still
> allows
> > for race conditions. @grant thoughts on this?
> >
> > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel <alex.guziel@airbnb.com.
> > invalid>
> > wrote:
> >
> > > It might be good enough to have RUNNING set immediately on the process
> > run
> > > and not being dependent on the dag file being parsed. It is annoying
> here
> > > too when dags parse on the scheduler but not the worker, since queued
> > tasks
> > > that don't heartbeat will not get retried, while running tasks will.
> > >
> > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas <
> > > grantnicholas2015@u.northwestern.edu> wrote:
> > >
> > > > ---Opening up this conversation to the whole mailing list, as
> suggested
> > > by
> > > > Bolke---
> > > >
> > > >
> > > > A "launched" state has been suggested in the past (see here
> > > > <https://github.com/apache/incubator-airflow/blob/master/
> > > > airflow/utils/state.py#L31>)
> > > > but never implemented for reasons unknown to us. Does anyone have
> more
> > > > details about why?
> > > >
> > > > There are two big reasons why adding a new "launched" state to
> airflow
> > > > would be useful:
> > > >
> > > > 1. A "launched" state would be useful for crash safety of the
> > scheduler.
> > > If
> > > > the scheduler crashes in between the scheduler launching the task and
> > the
> > > > task process starting up then we lose information about whether that
> > task
> > > > was launched or not. By moving the state of the task to "launched"
> when
> > > it
> > > > is sent off to celery/dask/kubernetes/etc, when crashes happen you
> know
> > > > whether you have to relaunch the task or not.
> > > >
> > > > To workaround this issue, on startup of the kubernetes executor we
> > query
> > > > all "queued" tasks and if there is not a matching kubernetes pod for
> > that
> > > > task then we set the task state to "None" so it is rescheduled. See
> > here
> > > > <https://github.com/bloomberg/airflow/blob/airflow-
> > > >
> > > kubernetes-executor/airflow/contrib/executors/kubernetes_
> > executor.py#L400>
> > > > for
> > > > details if you are curious. While this works for the kubernetes
> > executor,
> > > > other executors can't easily introspect launched tasks and this means
> > the
> > > > celery executor (afaik) is not crash safe.
> > > >
> > > > 2. A "launched" state would allow for dynamic backpressure of tasks,
> > not
> > > > just static backpressure. Right now, airflow only allows static
> > > > backpressure (`parallelism` config).This means you must statically
> say
> > I
> > > > only want to allow N running tasks at once. Imagine you have lots of
> > > tasks
> > > > being scheduled on your celery cluster/kubernetes cluster and since
> the
> > > > resource usage of each task is heterogenous you don't know exactly
> how
> > > many
> > > > running tasks you can tolerate at once. If instead you can say "I
> only
> > > want
> > > > tasks to be launched while I have less than N tasks in the launched
> > > state"
> > > > you get some adaptive backpressure.
> > > >
> > > > While we have workarounds described above for the kubernetes
> executor,
> > > how
> > > > do people feel about introducing a launched state into airflow so we
> > > don't
> > > > need the workarounds? I think there are benefits to be gained for all
> > the
> > > > executors.
> > > >
> > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin <bdbruin@gmail.com>
> > > wrote:
> > > >
> > > > >
> > > > > Hi Daniel,
> > > > >
> > > > > (BTW: I do think this discussion is better to have at the
> > mailinglist,
> > > > > more people might want to chime in and offer valuable opinions)
> > > > >
> > > > > Jumping right in: I am wondering if are you not duplicating the
> > > “queued”
> > > > > logic for (a.o) pools. Introducing LAUNCHED with the meaning
> attached
> > > to
> > > > > it that you describe, would mean that we have a second place where
> we
> > > > > handle back pressure.
> > > > >
> > > > > Isn’t there a way to ask the k8s cluster how many tasks it has
> > pending
> > > > and
> > > > > just to execute any queued tasks when it crosses a certain
> threshold?
> > > > Have
> > > > > a look a base_executor where it is handling slots and queued tasks.
> > > > >
> > > > > Cheers
> > > > > Bolke
> > > > >
> > > > >
> > > > > Verstuurd vanaf mijn iPad
> > > > >
> > > > > Op 15 nov. 2017 om 01:39 heeft Daniel Imberman <
> > > > daniel.imberman@gmail.com>
> > > > > het volgende geschreven:
> > > > >
> > > > > Hi Bolke and Dan!
> > > > >
> > > > > I had a quick question WRT the launched state (
> > > > > https://github.com/apache/incubator-airflow/blob/master/air
> > > > > flow/utils/state.py#L32).
> > > > >
> > > > > We are handling the issue of throttling the executor when the k8s
> > > cluster
> > > > > has more than 5 pending tasks (which usually means that the cluster
> > is
> > > > > under a lot of strain), and one thought we had WRT crash safety was
> > to
> > > > use
> > > > > a "LAUNCHED" state for pods that have been submitted but are not
> > > running
> > > > > yet.
> > > > >
> > > > > With the launched state currently being TBD, I was wondering if
> there
> > > was
> > > > > any reason you guys would not want this state? There are other
> > > > workarounds
> > > > > we can do, but we wanted to check in with you guys first.
> > > > >
> > > > > Thanks!
> > > > >
> > > > > Daniel
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message