airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Guziel <alex.guz...@airbnb.com.INVALID>
Subject Re: Introducing a "LAUNCHED" state into airflow
Date Fri, 01 Dec 2017 09:19:31 GMT
The task instance audit is pretty good for debugging but maybe not as
useful to pure users.

The crashing sqlalchemy thing is bad in terms of just being bad practice
but handling it better wouldn’t be much better than increasing
innodb_lock_wait_timeout in practice.

On Fri, Dec 1, 2017 at 12:56 AM Daniel (Daniel Lamblin) [BDP - Seoul] <
lamblin@coupang.com> wrote:

> On this tangent, our scheduler occasionally crashes when the db tells
> SQLAlchemy that there's a lock on the task it's trying to set as queued or
> running.
> Some (update query) retry logic in the (many) callers seems to be in order.
>
> On 12/1/17, 2:19 PM, "Maxime Beauchemin" <maximebeauchemin@gmail.com>
> wrote:
>
>     Taking a tangent here:
>
>     I like the idea of logging every state change to another table.
> Mutating
>     task_instance from many places results in things that are hard to
> debug in
>     some cases.
>
>     As we need similar history-tracking of mutations on task_instances
> around
>     retries, we may want keep track of history for anything that touches
>     task_instance.
>
>     It may be easy-ish to maintain this table using SQLAlchemy after-update
>     hooks on the model where we'd systematically insert in a
>     task_instance_history. Just a thought.
>
>     Max
>
>     On Thu, Nov 30, 2017 at 11:26 AM, Grant Nicholas <
>     grantnicholas2015@u.northwestern.edu> wrote:
>
>     > Thanks, I see why that should work, I just know that from testing
> this
>     > myself that I had to manually clear out old QUEUED task instances to
> get
>     > them to reschedule. I'll do some more testing to confirm, it's
> totally
>     > possible I did something wrong in our test suite setup.
>     >
>     > On Thu, Nov 30, 2017 at 1:18 PM, Alex Guziel <alex.guziel@airbnb.com
> .
>     > invalid
>     > > wrote:
>     >
>     > > See reset_state_for_orphaned_tasks in jobs.py
>     > >
>     > > On Thu, Nov 30, 2017 at 11:17 AM, Alex Guziel <
> alex.guziel@airbnb.com>
>     > > wrote:
>     > >
>     > > > Right now the scheduler re-launches all QUEUED tasks on restart
> (there
>     > > are
>     > > > safeguards for duplicates).
>     > > >
>     > > > On Thu, Nov 30, 2017 at 11:13 AM, Grant Nicholas
> <grantnicholas2015@u.
>     > > > northwestern.edu> wrote:
>     > > >
>     > > >> @Alex
>     > > >> I agree setting the RUNNING state immediately when `airflow run`
>     > starts
>     > > up
>     > > >> would be useful on its own, but it doesn't solve all the
> problems.
>     > What
>     > > >> happens if you have a task in the QUEUED state (that may or may
> not
>     > have
>     > > >> been launched) and your scheduler crashes. What does the
> scheduler do
>     > on
>     > > >> startup, does it launch all QUEUED tasks again (knowing that
> there may
>     > > be
>     > > >> duplicate tasks) or does it not launch the QUEUED tasks again
> (knowing
>     > > >> that
>     > > >> the task may be stuck in the QUEUED state forever). Right now,
> airflow
>     > > >> does
>     > > >> the latter which I think is not correct, as you can potentially
> have
>     > > tasks
>     > > >> stuck in the QUEUED state forever.
>     > > >>
>     > > >> Using a LAUNCHED state would explicitly keep track of whether
> tasks
>     > were
>     > > >> submitted for execution or not. At that point it's up to your
>     > messaging
>     > > >> system/queueing system/etc to be crash safe, and that is
> something you
>     > > get
>     > > >> for free with kubernetes and it's something you can tune with
> celery
>     > > >> persistent queues.
>     > > >>
>     > > >> Note: Another option is to NOT introduce a new state but have
> airflow
>     > > >> launch QUEUED tasks again on startup of the executor. This
> would mean
>     > > that
>     > > >> we may launch duplicate tasks, but this should not be an issue
> since
>     > we
>     > > >> have built in protections on worker startup to avoid having two
>     > RUNNING
>     > > >> task instances at once.
>     > > >>
>     > > >>
>     > > >>
>     > > >> On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel <
>     > > >> alex.guziel@airbnb.com.invalid> wrote:
>     > > >>
>     > > >> > I think the more sensible thing here is to just to set the
> state to
>     > > >> RUNNING
>     > > >> > immediately in the airflow run process. I don't think the
>     > distinction
>     > > >> > between launched and running adds much value.
>     > > >> >
>     > > >> > On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman <
>     > > >> > daniel.imberman@gmail.com
>     > > >> > > wrote:
>     > > >> >
>     > > >> > > @Alex
>     > > >> > >
>     > > >> > > That could potentially work since if you have the same
task
>     > launched
>     > > >> > twice
>     > > >> > > then the second time would die due to the "already running
>     > > >> dependency".
>     > > >> > > Still less ideal than not launching that task at all
since
> it
>     > still
>     > > >> > allows
>     > > >> > > for race conditions. @grant thoughts on this?
>     > > >> > >
>     > > >> > > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel <
>     > > alex.guziel@airbnb.com.
>     > > >> > > invalid>
>     > > >> > > wrote:
>     > > >> > >
>     > > >> > > > It might be good enough to have RUNNING set immediately
> on the
>     > > >> process
>     > > >> > > run
>     > > >> > > > and not being dependent on the dag file being parsed.
It
> is
>     > > annoying
>     > > >> > here
>     > > >> > > > too when dags parse on the scheduler but not the
worker,
> since
>     > > >> queued
>     > > >> > > tasks
>     > > >> > > > that don't heartbeat will not get retried, while
running
> tasks
>     > > will.
>     > > >> > > >
>     > > >> > > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas
<
>     > > >> > > > grantnicholas2015@u.northwestern.edu> wrote:
>     > > >> > > >
>     > > >> > > > > ---Opening up this conversation to the whole
mailing
> list, as
>     > > >> > suggested
>     > > >> > > > by
>     > > >> > > > > Bolke---
>     > > >> > > > >
>     > > >> > > > >
>     > > >> > > > > A "launched" state has been suggested in the
past (see
> here
>     > > >> > > > > <
> https://github.com/apache/incubator-airflow/blob/master/
>     > > >> > > > > airflow/utils/state.py#L31>)
>     > > >> > > > > but never implemented for reasons unknown
to us. Does
> anyone
>     > > have
>     > > >> > more
>     > > >> > > > > details about why?
>     > > >> > > > >
>     > > >> > > > > There are two big reasons why adding a new
"launched"
> state to
>     > > >> > airflow
>     > > >> > > > > would be useful:
>     > > >> > > > >
>     > > >> > > > > 1. A "launched" state would be useful for
crash safety
> of the
>     > > >> > > scheduler.
>     > > >> > > > If
>     > > >> > > > > the scheduler crashes in between the scheduler
> launching the
>     > > task
>     > > >> and
>     > > >> > > the
>     > > >> > > > > task process starting up then we lose information
about
>     > whether
>     > > >> that
>     > > >> > > task
>     > > >> > > > > was launched or not. By moving the state of
the task to
>     > > "launched"
>     > > >> > when
>     > > >> > > > it
>     > > >> > > > > is sent off to celery/dask/kubernetes/etc,
when crashes
> happen
>     > > you
>     > > >> > know
>     > > >> > > > > whether you have to relaunch the task or not.
>     > > >> > > > >
>     > > >> > > > > To workaround this issue, on startup of the
kubernetes
>     > executor
>     > > we
>     > > >> > > query
>     > > >> > > > > all "queued" tasks and if there is not a matching
> kubernetes
>     > pod
>     > > >> for
>     > > >> > > that
>     > > >> > > > > task then we set the task state to "None"
so it is
>     > rescheduled.
>     > > >> See
>     > > >> > > here
>     > > >> > > > > <https://github.com/bloomberg/airflow/blob/airflow-
>     > > >> > > > >
>     > > >> > > > kubernetes-executor/airflow/contrib/executors/kubernetes_
>     > > >> > > executor.py#L400>
>     > > >> > > > > for
>     > > >> > > > > details if you are curious. While this works
for the
>     > kubernetes
>     > > >> > > executor,
>     > > >> > > > > other executors can't easily introspect launched
tasks
> and
>     > this
>     > > >> means
>     > > >> > > the
>     > > >> > > > > celery executor (afaik) is not crash safe.
>     > > >> > > > >
>     > > >> > > > > 2. A "launched" state would allow for dynamic
> backpressure of
>     > > >> tasks,
>     > > >> > > not
>     > > >> > > > > just static backpressure. Right now, airflow
only allows
>     > static
>     > > >> > > > > backpressure (`parallelism` config).This means
you must
>     > > statically
>     > > >> > say
>     > > >> > > I
>     > > >> > > > > only want to allow N running tasks at once.
Imagine you
> have
>     > > lots
>     > > >> of
>     > > >> > > > tasks
>     > > >> > > > > being scheduled on your celery cluster/kubernetes
> cluster and
>     > > >> since
>     > > >> > the
>     > > >> > > > > resource usage of each task is heterogenous
you don't
> know
>     > > exactly
>     > > >> > how
>     > > >> > > > many
>     > > >> > > > > running tasks you can tolerate at once. If
instead you
> can say
>     > > "I
>     > > >> > only
>     > > >> > > > want
>     > > >> > > > > tasks to be launched while I have less than
N tasks in
> the
>     > > >> launched
>     > > >> > > > state"
>     > > >> > > > > you get some adaptive backpressure.
>     > > >> > > > >
>     > > >> > > > > While we have workarounds described above
for the
> kubernetes
>     > > >> > executor,
>     > > >> > > > how
>     > > >> > > > > do people feel about introducing a launched
state into
> airflow
>     > > so
>     > > >> we
>     > > >> > > > don't
>     > > >> > > > > need the workarounds? I think there are benefits
to be
> gained
>     > > for
>     > > >> all
>     > > >> > > the
>     > > >> > > > > executors.
>     > > >> > > > >
>     > > >> > > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de
Bruin <
>     > > >> bdbruin@gmail.com>
>     > > >> > > > wrote:
>     > > >> > > > >
>     > > >> > > > > >
>     > > >> > > > > > Hi Daniel,
>     > > >> > > > > >
>     > > >> > > > > > (BTW: I do think this discussion is better
to have at
> the
>     > > >> > > mailinglist,
>     > > >> > > > > > more people might want to chime in and
offer valuable
>     > > opinions)
>     > > >> > > > > >
>     > > >> > > > > > Jumping right in: I am wondering if are
you not
> duplicating
>     > > the
>     > > >> > > > “queued”
>     > > >> > > > > > logic for (a.o) pools. Introducing LAUNCHED
with the
> meaning
>     > > >> > attached
>     > > >> > > > to
>     > > >> > > > > > it that you describe, would mean that
we have a
> second place
>     > > >> where
>     > > >> > we
>     > > >> > > > > > handle back pressure.
>     > > >> > > > > >
>     > > >> > > > > > Isn’t there a way to ask the k8s cluster
how many
> tasks it
>     > has
>     > > >> > > pending
>     > > >> > > > > and
>     > > >> > > > > > just to execute any queued tasks when
it crosses a
> certain
>     > > >> > threshold?
>     > > >> > > > > Have
>     > > >> > > > > > a look a base_executor where it is handling
slots and
> queued
>     > > >> tasks.
>     > > >> > > > > >
>     > > >> > > > > > Cheers
>     > > >> > > > > > Bolke
>     > > >> > > > > >
>     > > >> > > > > >
>     > > >> > > > > > Verstuurd vanaf mijn iPad
>     > > >> > > > > >
>     > > >> > > > > > Op 15 nov. 2017 om 01:39 heeft Daniel
Imberman <
>     > > >> > > > > daniel.imberman@gmail.com>
>     > > >> > > > > > het volgende geschreven:
>     > > >> > > > > >
>     > > >> > > > > > Hi Bolke and Dan!
>     > > >> > > > > >
>     > > >> > > > > > I had a quick question WRT the launched
state (
>     > > >> > > > > >
> https://github.com/apache/incubator-airflow/blob/master/air
>     > > >> > > > > > flow/utils/state.py#L32).
>     > > >> > > > > >
>     > > >> > > > > > We are handling the issue of throttling
the executor
> when
>     > the
>     > > >> k8s
>     > > >> > > > cluster
>     > > >> > > > > > has more than 5 pending tasks (which
usually means
> that the
>     > > >> cluster
>     > > >> > > is
>     > > >> > > > > > under a lot of strain), and one thought
we had WRT
> crash
>     > > safety
>     > > >> was
>     > > >> > > to
>     > > >> > > > > use
>     > > >> > > > > > a "LAUNCHED" state for pods that have
been submitted
> but are
>     > > not
>     > > >> > > > running
>     > > >> > > > > > yet.
>     > > >> > > > > >
>     > > >> > > > > > With the launched state currently being
TBD, I was
> wondering
>     > > if
>     > > >> > there
>     > > >> > > > was
>     > > >> > > > > > any reason you guys would not want this
state? There
> are
>     > other
>     > > >> > > > > workarounds
>     > > >> > > > > > we can do, but we wanted to check in
with you guys
> first.
>     > > >> > > > > >
>     > > >> > > > > > Thanks!
>     > > >> > > > > >
>     > > >> > > > > > Daniel
>     > > >> > > > > >
>     > > >> > > > > >
>     > > >> > > > > >
>     > > >> > > > >
>     > > >> > > >
>     > > >> > >
>     > > >> >
>     > > >>
>     > > >
>     > > >
>     > >
>     >
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message