airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maxime Beauchemin <>
Subject Re: Make Scheduler More Centralized
Date Thu, 16 Mar 2017 04:56:10 GMT
A few related thoughts about the scheduler.

The scheduler is growing to take on much more than just scheduling, so much
so that "supervisor" would be a better name for it. It includes:
* parsing DAGs (eventually it may serialize their metadata to the database
to help make the web server stateless)
* scheduling (duh.)
* monitoring heartbeats and handling related failures (email,
on_error_callback, ...)
* buffering + prioritization while applying pool constraints
* ?+[new] handling all task failures?

One approach would be to break down this workload and distribute it, to a
point where it really doesn't matter who's doing the "supervising" work. A
worker slot could be assigned to run a single "supervisor cycle" for a
single DAG for instance. Finishing a task run could also trigger an attempt
to schedule the dependent tasks (to allow for very low latency scheduling!)

The challenge with this approach is that the logs are all over the place
(until we figure a good distributed logging solution), and that there's a
need for a buffered window to handle prioritization. In many cases
prioritization isn't used or important, and low-latency is much more
important. I would tend to vote down changes and designs that take use
further away from a low-latency scheduling.

I understand this email doesn't clarify things, but wanted to get this out.


On Wed, Mar 15, 2017 at 2:14 PM, Bolke de Bruin <> wrote:

> Hi Rui,
> We have been discussing this during the hackathon at Airbnb as well.
> Besides the reservations Gerard is documenting, I am also not enthusiastic
> about this design. Currently, the scheduler is our main issue in scaling.
> Scheduler runs will take longer and longer with more DAGs and more complex
> DAGS (ie. more tasks in a DAG). To move more things into the scheduler
> makes it more difficult to move things out again. This is required when we
> want to move to an event driven / snowballing scheduler.
> I would suggest documentation and enforcing the contracts between the
> scheduler - executor - task instance. We are lax in that respect and this
> is where a lot of issue stem from. Also the executor is the weak point here
> as it doesn’t do anything with the task state, but it does handle them. The
> points Gerard makes are very valid and we should improve our assumptions of
> the underlying bus.
> Cheers
> Bolke
> > On 14 Mar 2017, at 15:08, Rui Wang <> wrote:
> >
> > Hi,
> > The design doc below I created is trying to make airflow scheduler more
> > centralized. Briefly speaking, I propose moving state change of
> > TaskInstance to scheduler. You can see the reasons for this change below.
> >
> >
> > Could you take a look and comment if you see anything does not make
> sense?
> >
> > -Rui
> >
> > ------------------------------------------------------------
> --------------------------------------
> > Current The state of TaskInstance is changed by both scheduler and
> worker.
> > On worker side, worker monitors TaskInstance and changes the state to
> > RUNNING, SUCCESS, if task succeed, or to UP_FOR_RETRY, FAILED if task
> fail.
> > Worker also does failure email logic and failure callback logic.
> > Proposal The general idea is to make a centralized scheduler and make
> > workers dumb. Worker should not change state of TaskInstance, but just
> > executes what it is assigned and reports the result of the task. Instead,
> > the scheduler should make the decision on TaskInstance state change.
> > Ideally, workers should not even handle the failure emails and callbacks
> > unless the scheduler asks it to do so.
> > Why Worker does not have as much information as scheduler has. There were
> > bugs observed caused by worker when worker gets into trouble but cannot
> > make decision to change task state due to lack of information. Although
> > there is airflow metadata DB, it is still not easy to share all
> information
> > that scheduler has with workers.
> >
> > We can also ensure a consistent environment. There are slight differences
> > in the chef recipes for the different workers which can cause strange
> > issues when DAGs parse on one but not the other.
> >
> > In the meantime, moving state changes to the scheduler can reduce the
> > complexity of airflow. It especially helps when airflow needs to move to
> > distributed schedulers. In that case state change everywhere by both
> > schedulers and workers are harder to maintain.
> > How to change After lots of discussions, following step will be done:
> >
> > 1. Add a new column to TaskInstance table. Worker will fill this column
> > with the task process exit code.
> >
> > 2. Worker will only set TaskInstance state to RUNNING when it is ready to
> > run task. There was debate on moving RUNNING to scheduler as well. If
> > moving RUNNING to scheduler, either scheduler marks TaskInstance RUNNING
> > before it gets into queue, or scheduler checks the status code in column
> > above, which is updated by worker when worker is ready to run task. In
> > Former case, from user's perspective, it is bad to mark TaskInstance as
> > RUNNING when worker is not ready to run. User could be confused. In the
> > latter case, scheduler could mark task as RUNNING late due to schedule
> > interval. It is still not a good user experience. Since only worker knows
> > when is ready to run task, worker should still deliver this message to
> user
> > by setting RUNNING state.
> >
> > 3. In any other cases, worker should not change state of TaskInstance,
> but
> > save defined status code into column above.
> >
> > 4. Worker still handles failure emails and callbacks because there were
> > concern that scheduler could use too much resource to run failure
> callbacks
> > given unpredictable callback sizes. ( I think ideally scheduler should
> > treat failure callbacks and emails as tasks, and assign such tasks to
> > workers after TaskInstance state changes correspondingly). Eventually
> this
> > logic will be moved to the workers once there is support for multiple
> > distributed schedulers.
> >
> > 5. In scheduler's loop, scheduler should check TaskInstance status code,
> > then change state and retry/fail TaskInstance correspondingly.

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message