airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maxime Beauchemin <maximebeauche...@gmail.com>
Subject Re: How to add hooks for strong deployment consistency?
Date Thu, 01 Mar 2018 16:30:26 GMT
I'm curious to hear which DagFetcher abstraction people would build or want
to use.

So far it sounded like the most popular and flexible approach would be a
`GitDagFetcher` where all SHAs and refs become a possibility, as opposed to
say a TarballOnS3DagFetcher which would require more manual artifact
management and versioning, which represent additional [human] workflow on
top of the already existing git-based workflow.

One way I've seen this done before is by using this Git fuse (file system
in user space) hack that creates a virtual filesystem where all SHAs and
refs in the Git repo are exposed as a subfolder, and under each ref
subfolder the whole repo sits as of that ref. Of course all the files are
virtual and fetched at access time by the virtual filesystem using the git
api. So if you simply point the DagBag loader to the right [virtual]
directory, it will import the right version of the DAG. In the git world,
the alternative to that is managing temp folders and doing shallow clones
which seems like much more of a headache. Note that one tradeoff is that if
git and whatever it depends has then a need to be highly available.

Max

On Wed, Feb 28, 2018 at 6:55 PM, David Capwell <dcapwell@gmail.com> wrote:

> Thanks for all the details! With a pluggable fetcher we would be able to
> add our own logic for how to fetch so sounds like a good place to start for
> something like this!
>
> On Wed, Feb 28, 2018, 4:39 PM Joy Gao <joyg@wepay.com> wrote:
>
> > +1 on DagFetcher abstraction, very airflow-esque :)
> >
> > On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin
> > <maximebeauchemin@gmail.com> wrote:
> > > Addressing a few of your questions / concerns:
> > >
> > > * The scheduler uses a multiprocess queue to queue up tasks, each
> > > subprocess is in charge of a single DAG "scheduler cycle" which
> triggers
> > > what it can for active DagRuns. Currently it fills the DagBag from the
> > > local file system, looking for a specific module where the master
> process
> > > last saw that DAG. Fetching the DAG is a metadata operation, DAG
> > artifacts
> > > shouldn't be too large, we can assume that it takes seconds at most to
> > > fetch a DAG, which is ok. We generally assume that the scheduler should
> > > fully cycle every minute or so. Version-aware DagFetcher could also
> > > implement some sort of caching if that was a concern (shouldn't be
> > though).
> > > * For consistency within the whole DagRun, the scheduler absolutely has
> > to
> > > read the right version. If tasks got removed they would never get
> > scheduled
> > > and consistency cannot be achieved.
> > > * TaskInstances get created the first time they are identified as
> > runnable
> > > by the scheduler and are born with a queued status I believe (from
> > memory,
> > > haven't read the latest code to confirm). The worker double checks and
> > sets
> > > it as running as part of a database transaction to avoid double-firing.
> > >
> > > Max
> > >
> > > On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <chris@crpalmer.com>
> > wrote:
> > >
> > >> I'll preface this with the fact that I'm relatively new to Airflow,
> and
> > >> haven't played around with a lot of the internals.
> > >>
> > >> I find the idea of a DagFetcher interesting but would we worry about
> > >> slowing down the scheduler significantly? If the scheduler is having
> to
> > >> "fetch" multiple different DAG versions, be it git refs or artifacts
> > from
> > >> Artifactory, we are talking about adding significant time to each
> > scheduler
> > >> run. Also how would the scheduler know which DAGs to fetch from where
> if
> > >> there aren't local files on disk listing those DAGs? Maybe I'm missing
> > >> something in the implementation.
> > >>
> > >> It seems to me that the fetching of the different versions should be
> > >> delegated to the Task (or TaskInstance) itself. That ensures we only
> > spend
> > >> the time to "fetch" the version that is needed when it is needed. One
> > down
> > >> side might be that each TaskInstance running for the same version of
> the
> > >> DAG might end up doing the "fetch" independently (duplicating that
> > work).
> > >>
> > >> I think this could be done by adding some version attribute to the
> > DagRun
> > >> that gets set at creation, and have the scheduler pass that version to
> > the
> > >> TaskInstances when they are created. You could even extend this so
> that
> > you
> > >> could have an arbitrary set of "executor_parameters" that get set on a
> > >> DagRun and are passed to TaskInstances. Then the specific Executor
> class
> > >> that is running that TaskInstance could handle the
> > "executor_parameters" as
> > >> it sees fit.
> > >>
> > >> One thing I'm not clear on is how and when TaskInstances are created.
> > When
> > >> the scheduler first sees a specific DagRun do all the TaskInstances
> get
> > >> created immediately, but only some of them get queued? Or does the
> > >> scheduler only create those TaskInstances which can be queued right
> now?
> > >>
> > >> In particular if a DagRun gets created and while it is running the DAG
> > is
> > >> updated and a new Task is added, will the scheduler pick up that new
> > Task
> > >> for the running DagRun? If the answer is yes, then my suggestion above
> > >> would run the risk of scheduling a Task for a DAG version where that
> > Task
> > >> didn't exist. I'm sure you could handle that somewhat gracefully but
> > it's a
> > >> bit ugly.
> > >>
> > >> Chris
> > >>
> > >> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
> > >> maximebeauchemin@gmail.com> wrote:
> > >>
> > >> > At a higher level I want to say a few things about the idea of
> > enforcing
> > >> > version consistency within a DagRun.
> > >> >
> > >> > One thing we've been talking about is the need for a "DagFetcher"
> > >> > abstraction, where it's first implementation that would replace and
> > mimic
> > >> > the current one would be "FileSystemDagFetcher". One specific
> > DagFetcher
> > >> > implementation may or may not support version semantics, but if it
> > does
> > >> > should be able to receive a version id and return the proper version
> > of
> > >> the
> > >> > DAG object. For instance that first "FileSystemDagFetcher" would not
> > >> > support version semantic, but perhaps a "GitRepoDagFetcher" would,
> or
> > an
> > >> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
> > >> >
> > >> > Of course that assumes that the scheduler knows and stores the
> active
> > >> > version number when generating a new DagRun, and for that
> information
> > to
> > >> be
> > >> > leveraged on subsequent scheduler cycles and on workers when task
> are
> > >> > executed.
> > >> >
> > >> > This could also enable things like "remote" backfills (non local,
> > >> > parallelized) of a DAG definition that's on an arbitrary git ref
> > >> (assuming
> > >> > a "GitRepoDagFetcher").
> > >> >
> > >> > There are [perhaps] unintuitive implications where clearing a single
> > task
> > >> > would then re-run the old DAG definition on that task (since the
> > version
> > >> > was stamped in the DagRun and hasn't changed), but
> > deleting/recreating a
> > >> > DagRun would run the latest version (or any other version that may
> be
> > >> > specified for that matter).
> > >> >
> > >> > I'm unclear on how much work that represents exactly, but it's
> > certainly
> > >> > doable and may only require to change part of the DagBag class and
a
> > few
> > >> > other places.
> > >> >
> > >> > Max
> > >> >
> > >> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <dcapwell@gmail.com>
> > >> wrote:
> > >> >
> > >> > > Thanks for your feedback!
> > >> > >
> > >> > > Option 1 is a non-starter for us. The reason is we have DAGs
that
> > take
> > >> 9+
> > >> > > hours to run.
> > >> > >
> > >> > > Option 2 is more where my mind was going, but it's rather large.
> > How I
> > >> > see
> > >> > > it you need a MVCC DagBag that's aware of multiple versions (what
> > >> > provides
> > >> > > version?).  Assuming you can track active dag runs pointing to
> which
> > >> > > versions you know how to cleanup (fine with external).  The pro
> > here is
> > >> > you
> > >> > > have snapshot isolation for dag_run, con is more bookkeeping
and
> > >> require
> > >> > > deploy to work with this (last part may be a good thing though).
> > >> > >
> > >> > > The only other option I can think of is to lock deploy so the
> system
> > >> only
> > >> > > picks up new versions when no dag_run holds the lock.  This is
> > flawed
> > >> for
> > >> > > many reasons, but breaks horrible for dag_runs that takes minutes
> (I
> > >> > assume
> > >> > > 99% do).
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <joyg@wepay.com>
wrote:
> > >> > >
> > >> > > > Hi David!
> > >> > > >
> > >> > > > Thank you for clarifying, I think I understand your concern
now.
> > We
> > >> > > > currently also work around this by making sure a dag is
turned
> off
> > >> > > > when we deploy a new version. We also make sure our jobs
are
> > >> > > > idempotent and retry-enabled in the case when we forget
to turn
> > off
> > >> > > > the job, so the issue hasn't caused us too much headache.
> > >> > > >
> > >> > > > I do agree that it would be nice for Airflow to have the
option
> to
> > >> > > > guarantee a single version of dag per dag run. I see two
> > approaches:
> > >> > > >
> > >> > > > (1) If a dag is updated, the current dagrun fails and/or
> retries.
> > >> > > > (2) If a dag is updated, the current dagrun continues but
uses
> > >> version
> > >> > > > before the update.
> > >> > > >
> > >> > > > (1) requires some mechanism to compare dag generations.
One
> > option is
> > >> > > > to hash the dagfile and storing that value to the dagrun
table,
> > and
> > >> > > > compare against it each time a task is running. And in the
case
> if
> > >> the
> > >> > > > hash value is different, update the hash value, then fail/retry
> > the
> > >> > > > dag. I think this is a fairly safe approach.
> > >> > > >
> > >> > > > (2) is trickier. A dag only has a property "fileloc" which
> tracks
> > the
> > >> > > > location of the dag file, but the actual content of the
dag file
> > is
> > >> > > > never versioned. When a task instance starts running, it
> > dynamically
> > >> > > > re-processes the dag file specified by the fileloc, generate
all
> > the
> > >> > > > task objects from the dag file, and fetch the task object
by
> > task_id
> > >> > > > in order to execute it. So in order to guarantee each dagrun
to
> > run a
> > >> > > > specific version, previous versions must be maintained on
disk
> > >> somehow
> > >> > > > (maintaining this information in memory is difficult, since
if
> the
> > >> > > > scheduler/worker shuts down, that information is lost).
This
> > makes it
> > >> > > > a pretty big change, and I haven't thought much on how to
> > implement
> > >> > > > it.
> > >> > > >
> > >> > > > I'm personally leaning towards (1) for sake of simplicity.
Note
> > that
> > >> > > > some users may not want dag to fail/retry even when dag
is
> > updated,
> > >> so
> > >> > > > this should be an optional feature, not required.
> > >> > > >
> > >> > > > My scheduler-foo isn't that great, so curious what others
have
> to
> > say
> > >> > > > about this.
> > >> > > >
> > >> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <
> > dcapwell@gmail.com>
> > >> > > wrote:
> > >> > > > > Thanks for the reply Joy, let me walk you though things
as
> they
> > are
> > >> > > today
> > >> > > > >
> > >> > > > > 1) we don't stop airflow or disable DAGs while deploying
> > updates to
> > >> > > > logic,
> > >> > > > > this is done live once its released
> > >> > > > > 2) the python script in the DAG folder doesn't actually
have
> > DAGs
> > >> in
> > >> > it
> > >> > > > but
> > >> > > > > is a shim layer to allow us to deploy in a atomic way
for a
> > single
> > >> > host
> > >> > > > >   2.1) this script reads a file on local disk (less
than disk
> > page
> > >> > > size)
> > >> > > > to
> > >> > > > > find latest git commit deployed
> > >> > > > >   2.2) re-does the airflow DAG load process but pointing
to
> the
> > git
> > >> > > > commit
> > >> > > > > path
> > >> > > > >
> > >> > > > > Example directory structure
> > >> > > > >
> > >> > > > > /airflow/dags/shim.py
> > >> > > > > /airflow/real_dags/
> > >> > > > >                             /latest # pointer to latest
commit
> > >> > > > >                             /[git commit]/
> > >> > > > >
> > >> > > > > This is how we make sure deploys are consistent within
a
> single
> > >> task.
> > >> > > > >
> > >> > > > >
> > >> > > > > Now, lets assume we have a fully atomic commit process
and are
> > able
> > >> > to
> > >> > > > > upgrade DAGs at the exact same moment.
> > >> > > > >
> > >> > > > > At time T0 the scheduler knows of DAG V1 and schedules
two
> > tasks,
> > >> > > Task1,
> > >> > > > > and Task2
> > >> > > > > At time T1 Task1 is picked up by Worker1, so starts
executing
> > the
> > >> > task
> > >> > > > (V1
> > >> > > > > logic)
> > >> > > > > At time T2 deploy commit happens, current DAG version:
V2
> > >> > > > > At time T3, Task2 is picked up by Worker2, so starts
executing
> > the
> > >> > task
> > >> > > > (V2
> > >> > > > > logic)
> > >> > > > >
> > >> > > > > In many cases this isn't really a problem (tuning config
> change
> > to
> > >> > > hadoop
> > >> > > > > job), but as we have more people using Airflow this
is
> causing a
> > >> lot
> > >> > of
> > >> > > > > time spent debugging why production acted differently
than
> > expected
> > >> > > (the
> > >> > > > > problem was already fixed... why is it still here?).
 We also
> > see
> > >> > that
> > >> > > > some
> > >> > > > > tasks expect a given behavior from other tasks, and
since they
> > live
> > >> > in
> > >> > > > the
> > >> > > > > same git repo they can modify both tasks at the same
time if a
> > >> > breaking
> > >> > > > > change is needed, but when this rolls out to prod there
isn't
> a
> > way
> > >> > to
> > >> > > do
> > >> > > > > this other than turn off the DAG, and login to all
hosts to
> > verify
> > >> > > fully
> > >> > > > > deployed.
> > >> > > > >
> > >> > > > > We would like to remove this confusion and make
> > >> generations/versions
> > >> > > > (same
> > >> > > > > thing really) exposed to users and make sure for a
single
> > dag_run
> > >> > only
> > >> > > > one
> > >> > > > > version is used.
> > >> > > > >
> > >> > > > > I hope this is more clear.
> > >> > > > >
> > >> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <joyg@wepay.com>
> > wrote:
> > >> > > > >
> > >> > > > >> Hi David,
> > >> > > > >>
> > >> > > > >> Do you mind providing a concrete example of the
scenario in
> > which
> > >> > > > >> scheduler/workers see different states (I'm not
100% sure if
> I
> > >> > > > understood
> > >> > > > >> the issue at hand).
> > >> > > > >>
> > >> > > > >> And by same dag generation, are you referring to
the dag
> > version?
> > >> > (DAG
> > >> > > > >> version is currently not supported at all, but
I can see it
> > being
> > >> a
> > >> > > > >> building block for future use cases).
> > >> > > > >>
> > >> > > > >> Joy
> > >> > > > >>
> > >> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell
<
> > >> dcapwell@gmail.com>
> > >> > > > wrote:
> > >> > > > >>
> > >> > > > >> > My current thinking is to add a field to the
dag table that
> > is
> > >> > > > optional
> > >> > > > >> and
> > >> > > > >> > provided by the dag. We currently intercept
the load path
> do
> > >> could
> > >> > > use
> > >> > > > >> this
> > >> > > > >> > field to make sure we load the same generation.
 My concern
> > here
> > >> > is
> > >> > > > the
> > >> > > > >> > interaction with the scheduler, not as familiar
with that
> > logic
> > >> to
> > >> > > > >> predict
> > >> > > > >> > corner cases were this would fail.
> > >> > > > >> >
> > >> > > > >> > Any other recommendations for how this could
be done?
> > >> > > > >> >
> > >> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell
<
> > >> dcapwell@gmail.com>
> > >> > > > wrote:
> > >> > > > >> >
> > >> > > > >> > > We have been using airflow for logic
that delegates to
> > other
> > >> > > > systems so
> > >> > > > >> > > inject a task all tasks depends to make
sure all
> resources
> > >> used
> > >> > > are
> > >> > > > the
> > >> > > > >> > > same for all tasks in the dag. This works
well for tasks
> > that
> > >> > > > delegates
> > >> > > > >> > to
> > >> > > > >> > > external systems but people are starting
to need to run
> > logic
> > >> in
> > >> > > > >> airflow
> > >> > > > >> > > and the fact that scheduler and all workers
can see
> > different
> > >> > > > states is
> > >> > > > >> > > causing issues
> > >> > > > >> > >
> > >> > > > >> > > We can make sure that all the code is
deployed in a
> > consistent
> > >> > way
> > >> > > > but
> > >> > > > >> > > need help from the scheduler to tell
the workers the
> > current
> > >> > > > generation
> > >> > > > >> > for
> > >> > > > >> > > a DAG.
> > >> > > > >> > >
> > >> > > > >> > > My question is, what would be the best
way to modify
> > airflow
> > >> to
> > >> > > > allow
> > >> > > > >> > DAGs
> > >> > > > >> > > to define a generation value that the
scheduler could
> send
> > to
> > >> > > > workers?
> > >> > > > >> > >
> > >> > > > >> > > Thanks
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >>
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message