airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lance Norskog <lance.nors...@gmail.com>
Subject Re: Speeding up the scheduler - request for comments
Date Sun, 05 Jun 2016 05:21:50 GMT
Right, it would have to be committing to MySQL or Postgres.

Or have to implementations, one in stored procedures and one in Python as a
plug-in. It's an after-the-fact discussion now. Programmers have elbowed
DBAs out of the way to control database access software.

On Sat, Jun 4, 2016 at 3:59 PM, Chris Riccomini <criccomini@apache.org>
wrote:

> I've done something similar to this as triggers in MySQL. It's hard to do
> this in a generic way, though. Every SQL DB has their own way of expressing
> procedures (or none at all).
>
> On Sat, Jun 4, 2016 at 3:32 PM, Lance Norskog <lance.norskog@gmail.com>
> wrote:
>
> > Does anybody have experience with writing this kind of state change code
> in
> > stored procedures?
> >
> > On Sat, Jun 4, 2016 at 6:39 AM, Bolke de Bruin <bdbruin@gmail.com>
> wrote:
> >
> > >
> > > > Op 4 jun. 2016, om 03:16 heeft Maxime Beauchemin <
> > > maximebeauchemin@gmail.com> het volgende geschreven:
> > > >
> > > > Caching is a last resort solution and probably not a good thing here.
> > It
> > > > would introduce lag and confusion.
> > > >
> > >
> > > Agreed.
> > >
> > > > You seem to say that some things evaluated twice within a scheduler
> > > cycle?
> > > > What would that be?
> > >
> > > In the old scheduler (before AIRFLOW-128 the refactor of process_dag)
> it
> > > was actually 3 times, although the impact of it was probably around 2
> > times
> > > due to the query cache:
> > >
> > > 1. get_active_runs was calling ti.are_dependencies_met twice. First as
> > > part of the overall deadlock check, and because the results were not
> > > stored, again for the individual dag_run deadlock check.
> > > 2. process_dag is calling ti.is_runnable() which in its function also
> > call
> > > ti.are_dependencies_met()
> > >
> > > In the new version DagRun.update_state() gets called which is more or
> > less
> > > a copy of get_active_runs except for the fact that I store the deadlock
> > > checks. process_dag still calls ti.is_runnable() at the moment.
> > >
> > > >
> > > > Another option is to reduce the number of database interaction and
> make
> > > > sure indexes are in place and balanced (you may want to rebuild your
> > > > indexes and make sure they are getting used). If the DB is the point
> of
> > > > contention, we can assume that getting a fast DB, or rationalizing
> > > overall
> > > > DB interactions (less frequent heartbeats?) would also help.
> > >
> > > I actually verified if this would make a difference as part of
> > > AIRFLOW-128. It doesn’t really. Aggregation functions always require a
> > > table scan and ti.are_dependencies_met() uses 5 of them (though they
> > > probably get optimized to one)
> > >
> > > >
> > > > Another option is *distributed scheduling* where as part of the
> worker
> > > > after finishing a task instance it would evaluate [only] the directly
> > > > downstream tasks for execution. In theory the scheduler could just
> seed
> > > the
> > > > DAG and the downstream tasks be triggered by their parents,
> immediately
> > > > after, almost like *callbacks*. Then it matters a lot less how long
> > > > scheduling cycles actually take…
> > >
> > >
> > > Absolutely. I am tinkering with having a “notify_downstream” function
> in
> > > the task instances that call a “update” function on the downstream
> task.
> > It
> > > is conceptually quite a small step to check if the trigger rules have
> > been
> > > met and then notify the scheduler that the task can be scheduled. If
> you
> > > think about it further you could probably remove the scheduler logic
> for
> > > determining which task gets scheduled by putting a the TaskInstance on
> a
> > MQ
> > > and let the workers decide which one they can pick up. The scheduler
> > would
> > > then just do some garbage collection. I think this could be the next
> > step,
> > > not one I am prepared to undertake right now. I think a large speed up
> > can
> > > be gotten from the initial suggestion. I am working on a poc that will
> > > allow to see the difference in speed.
> > >
> > > >
> > > > Max
> > > >
> > > > On Fri, Jun 3, 2016 at 2:26 PM, Bolke de Bruin <bdbruin@gmail.com>
> > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> I am looking at speeding up the scheduler. Currently loop times
> > increase
> > > >> with the amount of tasks in a dag. This is due to
> > > >> TaskInstance.are_depedencies_met executing several aggregation
> > > functions on
> > > >> the database. These calls are expensive: between 0.05-0.15s per task
> > and
> > > >> for every scheduler loop this gets called twice. This call is where
> > the
> > > >> scheduler spends around 90% of its time when evaluating dags and is
> > the
> > > >> reason for people that have a large amount of tasks per dag to so
> > quite
> > > >> large loop times (north of 600s).
> > > >>
> > > >> I see 2 options to optimize the loop without going to a
> > multiprocessing
> > > >> approach which will just put the problem down the line (ie. the db
> or
> > > when
> > > >> you don’t have enough cores anymore).
> > > >>
> > > >> 1. Cache the call to TI.are_dependencies_met by either caching in
a
> > > >> something like memcache or removing the need for the double call
> > > >> (update_state and process_dag both make the call to
> > > >> TI.are_dependencies_met). This would more or less cut the time in
> > half.
> > > >>
> > > >> 2. Notify the downstream tasks of a state change of a upstream task.
> > > This
> > > >> would remove the need for the aggregation as the task would just
> > > ‘know’. It
> > > >> is a bit harder to implement correctly as you need to make sure you
> > keep
> > > >> being in a consistent state. Obviously you could still run a
> integrity
> > > >> check once in a while. This option would make the aggregation event
> > > based
> > > >> and significantly reduce the time spend here to around 1-5% of the
> > > current
> > > >> scheduler. There is a slight overhead added at a state change of the
> > > >> TaskInstance (managed by the TaskInstance itself).
> > > >>
> > > >> What do you think? My preferred option is #2. Am i missing any other
> > > >> options? Are scheduler loop times a concern at all?
> > > >>
> > > >> Thanks
> > > >> Bolke
> > > >>
> > > >>
> > > >>
> > >
> > >
> >
> >
> > --
> > Lance Norskog
> > lance.norskog@gmail.com
> > Redwood City, CA
> >
>



-- 
Lance Norskog
lance.norskog@gmail.com
Redwood City, CA

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message