airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Capwell <dcapw...@gmail.com>
Subject Re: How to add hooks for strong deployment consistency?
Date Wed, 28 Feb 2018 02:48:14 GMT
Thanks for your feedback!

Option 1 is a non-starter for us. The reason is we have DAGs that take 9+
hours to run.

Option 2 is more where my mind was going, but it's rather large.  How I see
it you need a MVCC DagBag that's aware of multiple versions (what provides
version?).  Assuming you can track active dag runs pointing to which
versions you know how to cleanup (fine with external).  The pro here is you
have snapshot isolation for dag_run, con is more bookkeeping and require
deploy to work with this (last part may be a good thing though).

The only other option I can think of is to lock deploy so the system only
picks up new versions when no dag_run holds the lock.  This is flawed for
many reasons, but breaks horrible for dag_runs that takes minutes (I assume
99% do).



On Tue, Feb 27, 2018, 4:50 PM Joy Gao <joyg@wepay.com> wrote:

> Hi David!
>
> Thank you for clarifying, I think I understand your concern now. We
> currently also work around this by making sure a dag is turned off
> when we deploy a new version. We also make sure our jobs are
> idempotent and retry-enabled in the case when we forget to turn off
> the job, so the issue hasn't caused us too much headache.
>
> I do agree that it would be nice for Airflow to have the option to
> guarantee a single version of dag per dag run. I see two approaches:
>
> (1) If a dag is updated, the current dagrun fails and/or retries.
> (2) If a dag is updated, the current dagrun continues but uses version
> before the update.
>
> (1) requires some mechanism to compare dag generations. One option is
> to hash the dagfile and storing that value to the dagrun table, and
> compare against it each time a task is running. And in the case if the
> hash value is different, update the hash value, then fail/retry the
> dag. I think this is a fairly safe approach.
>
> (2) is trickier. A dag only has a property "fileloc" which tracks the
> location of the dag file, but the actual content of the dag file is
> never versioned. When a task instance starts running, it dynamically
> re-processes the dag file specified by the fileloc, generate all the
> task objects from the dag file, and fetch the task object by task_id
> in order to execute it. So in order to guarantee each dagrun to run a
> specific version, previous versions must be maintained on disk somehow
> (maintaining this information in memory is difficult, since if the
> scheduler/worker shuts down, that information is lost). This makes it
> a pretty big change, and I haven't thought much on how to implement
> it.
>
> I'm personally leaning towards (1) for sake of simplicity. Note that
> some users may not want dag to fail/retry even when dag is updated, so
> this should be an optional feature, not required.
>
> My scheduler-foo isn't that great, so curious what others have to say
> about this.
>
> On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <dcapwell@gmail.com> wrote:
> > Thanks for the reply Joy, let me walk you though things as they are today
> >
> > 1) we don't stop airflow or disable DAGs while deploying updates to
> logic,
> > this is done live once its released
> > 2) the python script in the DAG folder doesn't actually have DAGs in it
> but
> > is a shim layer to allow us to deploy in a atomic way for a single host
> >   2.1) this script reads a file on local disk (less than disk page size)
> to
> > find latest git commit deployed
> >   2.2) re-does the airflow DAG load process but pointing to the git
> commit
> > path
> >
> > Example directory structure
> >
> > /airflow/dags/shim.py
> > /airflow/real_dags/
> >                             /latest # pointer to latest commit
> >                             /[git commit]/
> >
> > This is how we make sure deploys are consistent within a single task.
> >
> >
> > Now, lets assume we have a fully atomic commit process and are able to
> > upgrade DAGs at the exact same moment.
> >
> > At time T0 the scheduler knows of DAG V1 and schedules two tasks, Task1,
> > and Task2
> > At time T1 Task1 is picked up by Worker1, so starts executing the task
> (V1
> > logic)
> > At time T2 deploy commit happens, current DAG version: V2
> > At time T3, Task2 is picked up by Worker2, so starts executing the task
> (V2
> > logic)
> >
> > In many cases this isn't really a problem (tuning config change to hadoop
> > job), but as we have more people using Airflow this is causing a lot of
> > time spent debugging why production acted differently than expected (the
> > problem was already fixed... why is it still here?).  We also see that
> some
> > tasks expect a given behavior from other tasks, and since they live in
> the
> > same git repo they can modify both tasks at the same time if a breaking
> > change is needed, but when this rolls out to prod there isn't a way to do
> > this other than turn off the DAG, and login to all hosts to verify fully
> > deployed.
> >
> > We would like to remove this confusion and make generations/versions
> (same
> > thing really) exposed to users and make sure for a single dag_run only
> one
> > version is used.
> >
> > I hope this is more clear.
> >
> > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <joyg@wepay.com> wrote:
> >
> >> Hi David,
> >>
> >> Do you mind providing a concrete example of the scenario in which
> >> scheduler/workers see different states (I'm not 100% sure if I
> understood
> >> the issue at hand).
> >>
> >> And by same dag generation, are you referring to the dag version? (DAG
> >> version is currently not supported at all, but I can see it being a
> >> building block for future use cases).
> >>
> >> Joy
> >>
> >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <dcapwell@gmail.com>
> wrote:
> >>
> >> > My current thinking is to add a field to the dag table that is
> optional
> >> and
> >> > provided by the dag. We currently intercept the load path do could use
> >> this
> >> > field to make sure we load the same generation.  My concern here is
> the
> >> > interaction with the scheduler, not as familiar with that logic to
> >> predict
> >> > corner cases were this would fail.
> >> >
> >> > Any other recommendations for how this could be done?
> >> >
> >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <dcapwell@gmail.com>
> wrote:
> >> >
> >> > > We have been using airflow for logic that delegates to other
> systems so
> >> > > inject a task all tasks depends to make sure all resources used are
> the
> >> > > same for all tasks in the dag. This works well for tasks that
> delegates
> >> > to
> >> > > external systems but people are starting to need to run logic in
> >> airflow
> >> > > and the fact that scheduler and all workers can see different
> states is
> >> > > causing issues
> >> > >
> >> > > We can make sure that all the code is deployed in a consistent way
> but
> >> > > need help from the scheduler to tell the workers the current
> generation
> >> > for
> >> > > a DAG.
> >> > >
> >> > > My question is, what would be the best way to modify airflow to
> allow
> >> > DAGs
> >> > > to define a generation value that the scheduler could send to
> workers?
> >> > >
> >> > > Thanks
> >> > >
> >> >
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message