airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Driesprong, Fokko" <fo...@driesprong.frl>
Subject Re: [DISCUSS] AIP-12 Persist DAG into DB
Date Sun, 10 Mar 2019 18:18:35 GMT
Thanks Kevin for opening the discussion. I think it is important to have a
clear overview on how to approach the AIP.

First of all, how many DAGs do we have that take 30s to parse? I consider
this bad practice, and this would also result in an unworkable situation
with the current setup of Airflow since it will take a lot of resources on
the webserver/scheduler, and the whole system will become unresponsive. I
will be hard to cope with such DAGs in general.

The idea from the AIP is to have the versioned version of the dag in the
DB, so in the end, you won't need to parse the whole thing every time. Only
when you trigger a DAG, or when you want to see the current status of the
dag.

Like stated earlier, I strongly feel we shouldn't serialize the DAGs as
JSON(5) or pickles in general. For me, this is deferring the pain of
setting up a structure of the DAG object itself.
Having the DAG denormalized in the database will give us cleaner storage of
our DAG. We can, for example, enforce fields by making them not null, so we
know that is something is off at write time, instead of read. Furthermore,
we're missing logical types such as dates, which we efficiently can query
using the indices of the database.
Also, with all serialization formats, evolution isn't trivial. Consider the
situations when:
- We're introducing a new field, and it might be null, therefore we need to
bake in all kinds of logic into the Airflow code, which you don't want.
With proper migration scripts, you could prefill these fields, and make
them not null.
- Changing the models, for example, you still can't change a string-type
into a integer with adding custom logic. In this case, the reviewer needs
to be extra careful that there are no breaking changes introduced. Right
now we're doing minimal forward- and backward compatibilitytesting.

In the case we get too many migrations, we could also squash (some of them)
when preparing the release.

Personally, I don't think the serialization is the issue here. As Max
already mentioned, it is the optimal balance of (de)normalization. From the
user perspective, the serialization won't change much of the behaviour of
Airflow.

For me, instead of having `DAG.serialize()` and `DAG.deser(version)` is not
the ideal approach. But it might be that we're on the same page :-) I
believe it should be something like `DagRun.find('fokkos_dag',
datetime(2018, 03, 01))` and construct the correct version of the dag.
Since there is an uniqueness constrain on dag_id, datetime, this will
always return the same dag. You will get the versioned DagRun as it ran
that time. Serializing the fields adn storing them in the database should
happen transparently when you update the DAG object. When you run a dag,
you'll parse the dag, and then run it. `Dag().create_dagrun(...)`, this
will create a DagRun as the name suggests, if the version of the dag still
exists in the database, it will reuse that one, otherwise it will create a
new version of the DAG (with all the operators etc). In this sense the
version of the DAGs should be done within the Dag(Run).

The versioning will change the behavour from a user perspective. Right now
we store only a single version. For example, the poor mans backfilling
won't work anymore. This is clearing the state from past&future, up- and
downstream, and let it catch up again.
In this case, the old version of the DAG won't exists anymore, and
potentially there are tasks that aren't in the code anymore. In this case
we need to clear the version of the dag, and rerun it with the latest
version `DagRun.find('fokkos_dag', datetime(2018, 03, 01)).clear()`. How we
are going to do clear's downstram in the middle of the dag, that is
something I still have to figure out. Because potentially there are tasks
that can't be rerun because the underlying Python code has changed, both on
user level as on Airflow level. It will be impossible to get these features
pure in that sense.
I would not suggest adding a new status in here, indicating that the task
can't be rerun since it isn't part of the DAG anymore. We have to find the
balance here in adding complexity (also to the scheduler) and features that
we need to introduce to help the user.

Cheers, Fokko

Ps. Jarek, interesting idea. It shouldn't be too hard to make Airflow more
k8s native. You could package your dags within your container, and do a
rolling update. Add the DAGs as the last layer, and then point the DAGs
folder to the same location. The hard part here is that you need to
gracefuly restart the workers. Currently AFAIK the signals given to the pod
aren't respected. So when the scheduler/webserver/worker receives a
SIGTERM, it should stop the jobs nicely and then exit the container, before
k8s kills the container using a SIGKILL.  This will be challenging with the
workers, which they are potentially long-running. Maybe stop kicking off
new jobs, and let the old ones finish, will be good enough, but then we
need to increase the standard kill timeout substantially. Having this would
also enable the autoscaling of the workers.



Op za 9 mrt. 2019 om 19:07 schreef Maxime Beauchemin <
maximebeauchemin@gmail.com>:

> I want to raise the question of the amount of normalization we want to use
> here as it seems the to be an area that needs more attention.
>
> The SIP suggest having DAG blobs, task blobs and edges (call it the
> fairly-normalized). I also like the idea of all-encompassing (call it
> very-denormalized) DAG blobs as it seems easier to manage in terms of
> versioning. The question here is whether we go with one of these method
> exclusively, something in-between or even a hybrid approach (redundant
> blobs that use different level of normalization).
>
> It's nice and simple to just push or pull DAG atomic objects with a version
> stamp on it. It's clearly simpler than dealing with 3 versioned tables
> (dag, tasks, edges). There are a lot of pros/cons, and they become more
> apparent with the perspective of very large DAGs. If the web server is
> building a "task details page", using the "fairly-normalized" model, it can
> just pull what it needs instead of pulling the large DAG blob. Similarly,
> if building a sub-tree view (a subset of the DAG), perhaps it can only
> retrieve what it needs. But if you need the whole DAG (say for the
> scheduler use case) then you're dealing with more complex SQL/ORM
> operations (joins hopefully, or multiple db round trips)
>
> Now maybe the right approach is more something like 2 tables: DAG and
> task_details, where edges keys are denormalized into DAG (arguably that's a
> few KBs at most, even for large DAGs), and maybe the DAG object has most of
> the high level task metadata information (operator, name, baseoperator key
> attrs), and task_details has the big blobs (SQL code). This is probably a
> nice compromise, the question becomes "how much task-level detail do we
> store in the DAG-centric blog?", probably not much to keep the DAG objects
> as small as possible. The main downside here is that you cannot have the
> database join and have to do 2 round trips to reconstruct a DAG object
> (fetch the DAG, parse the object to get the list of tasks, and then run
> another db query to get those task details).
>
> To resume, I'd qualify the more normalized approach as the most proper, but
> also the more complex. It'll shine in specific cases around large DAGs. If
> we have the proper abstractions (methods like DAG.serialize(),
> DAG.deser(version)) then I guess that's not an issue.
>
> Max
>
> On Fri, Mar 8, 2019 at 5:21 PM Kevin Yang <yrqls21@gmail.com> wrote:
>
> > Hi Julian, I'm definitely aligned with you guys on making the webserver
> > independent of DAG parsing, just the end goal to me would be to build a
> > complete story around serializing DAG--and move with the story in mind. I
> > feel like you guys may already have a list of dynamic features we need to
> > deprecate/change, if that is the case feel free to open the discussion on
> > what we do to them with DAG serialization.
> >
> > Julian, Ash, Dan, on 2nd thought I do agree that if we can meet the
> > requirements Dan mentioned, it would be nice to have them stored in the
> DB.
> > Some combined solutions like having a column of serialized graph in the
> > serialized dag table can potentially meet all requirements. What format
> we
> > end up using to represent DAG between components is now less important
> > IMO--fine to refactor those endpoints only need DagModel to use only
> > DagModel, easy to do a batch replacement if we decide otherwise later.
> More
> > important is to define this source of truth for serialized DAG.
> >
> > Ash, ty for the email list, I'll tune my filters accordingly :D I'm
> leaning
> > towards having a separate process for the parser so we got no scheduler
> > dependency etc for this parser but we can discuss this in another thread.
> >
> > On Fri, Mar 8, 2019 at 8:57 AM Dan Davydov <ddavydov@twitter.com.invalid
> >
> > wrote:
> >
> > > >
> > > > Personally I don’t understand why people are pushing for a JSON-based
> > DAG
> > > > representation
> > >
> > > It sounds like you agree that DAGs should be serialized (just in the DB
> > > instead of JSON), so will only address why JSON is better than MySQL
> (AKA
> > > serializing at the DAG level vs the task level) as far as I can see,
> and
> > > not why we need serialization. If you zoom out and look at all the use
> > > cases of serialized DAGs, e.g. having the scheduler use them instead of
> > > parsing DAGs directly, then it becomes clear that we need all
> appropriate
> > > metadata in these DAGs, (operator params, DAG properties, etc), in
> which
> > > case it's not clear how it will fit nicely into a DB table (unless you
> > > wanted to do something like (parent_task_id, task_id, task_params),
> also
> > > keep in mind that we will need to store different versions of each DAG
> in
> > > the future so that we can ensure consistency in a dagrun, i.e. each
> task
> > in
> > > a dagrun uses the same version of a DAG.
> > >
> > > I think some of our requirements should be:
> > > 1. The data model will lead to acceptable performance in all of its
> > > consumers (scheduler, webserver, workers), i.e. no n+1 access patterns
> > (my
> > > biggest concern about serializing at task level as you propose vs at
> DAG
> > > level)
> > > 2. We can have versioning of serialized DAGs
> > > 3. The ability to separate DAGs into their own data store (e.g. no
> > reliance
> > > on joins between the new table and the old one)
> > > 4. One source of truth/serialized representation for DAGs (currently we
> > > have SimpleDAG)
> > >
> > > If we can full-fill all of these requirements and serialize at the task
> > > level rather than the DAG level in the DB, then I agree that probably
> > makes
> > > more sense.
> > >
> > >
> > > > In the proposed PR’s we (Peter, Bas and me) aim to avoid re-parsing
> DAG
> > > > files by querying all the required information from the database. In
> > one
> > > or
> > > > two cases this may however not be possible, in which case we might
> > either
> > > > have to fall back on the DAG file or add the missing information into
> > the
> > > > database. We can tackle these problems as we encounter them.
> > >
> > > I think you would have the support of many of committers in removing
> any
> > > use-cases that stand in the way of full serialization, that being said
> if
> > > we need to remove features we need to do this carefully and
> thoughtfully,
> > > and ideally with proposed alternatives/work-arounds to cover the
> > removals.
> > >
> > > The counter argument: this PR removes the need for the confusing
> > "Refresh"
> > > > button from the UI, and in general you only pay the cost for the
> > > expensive
> > > > DAGs when you ask about them. (I don't know what/when we call the
> > > > /pickle_info endpoint of the top of my head)
> > >
> > > Probably worth splitting out into a separate thread, but I'm actually
> not
> > > sure the refresh button does anything, I think we should double
> check...
> > I
> > > think about 2 years ago there was a commit made that made gunicorn
> > > webservers automatically rotate underneath flask (each one would
> reparse
> > > the DAGbag). Even if it works we should probably remove it since the
> > > webserver refresh interval is pretty fast, and it just causes confusion
> > to
> > > users and implies that the DAGs are not refreshed automatically.
> > >
> > > Do you mean https://json5.org/ or is this a typo? That might be okay
> > for a
> > > > nicer user front end, but the "canonical" version stored in the DB
> > should
> > > > be something "plainer" like just JSON.
> > >
> > > I think he got this from my reply, and it was just an example, but you
> > are
> > > right, I agree JSON would be better than JSON5.
> > >
> > > On Fri, Mar 8, 2019 at 8:53 AM Ash Berlin-Taylor <ash@apache.org>
> wrote:
> > >
> > > > Comments inline.
> > > >
> > > > > On 8 Mar 2019, at 11:28, Kevin Yang <yrqls21@gmail.com> wrote:
> > > > >
> > > > > Hi all,
> > > > > When I was preparing some work related to this AIP I found
> something
> > > > very concerning. I noticed this JIRA ticket <
> > > > https://issues.apache.org/jira/browse/AIRFLOW-3562> is trying to
> > remove
> > > > the dependency of dagbag from webserver, which is awesome--we wanted
> > > badly
> > > > but never got to start work on. However when I looked at some
> subtasks
> > of
> > > > it, which try to remove dagbag dependency from each endpoint, I found
> > the
> > > > way we remove the dependency of dagbag is not very ideal. For example
> > > this
> > > > PR <https://github.com/apache/airflow/pull/4867/files> will require
> us
> > > to
> > > > parse the dag file each time we hit the endpoint.
> > > >
> > > > The counter argument: this PR removes the need for the confusing
> > > "Refresh"
> > > > button from the UI, and in general you only pay the cost for the
> > > expensive
> > > > DAGs when you ask about them. (I don't know what/when we call the
> > > > /pickle_info endpoint of the top of my head)
> > > >
> > > > This end point may be one to hold off on (as it can ask for multiple
> > > dags)
> > > > but there are some that def don't need a full dag bag or to even
> parse
> > > the
> > > > dag file, the current DAG model has enough info.
> > > >
> > > > >
> > > > >
> > > > > If we go down this path, we indeed can get rid of the dagbag
> > dependency
> > > > easily, but we will have to 1. increase the DB load( not too
> concerning
> > > at
> > > > the moment ), 2. wait the DAG file to be parsed before getting the
> page
> > > > back, potentially multiple times. DAG file can sometimes take quite a
> > > while
> > > > to parse, e.g. we have some framework DAG files generating large
> number
> > > of
> > > > DAGs from some static config files or even jupyter notebooks and they
> > can
> > > > take 30+ seconds to parse. Yes we don't like large DAG files but
> people
> > > do
> > > > see the beauty of code as config and sometimes heavily abuseleverage
> > it.
> > > > Assuming all users have the same nice small python file that can be
> > > parsed
> > > > fast, I'm still a bit worried about this approach. Continuing on this
> > > path
> > > > means we've chosen DagModel to be the serialized representation of
> DAG
> > > and
> > > > DB columns to hold different properties--it can be one candidate but
> I
> > > > don't know if we should settle on that now. I would personally
> prefer a
> > > > more compact, e.g. JSON5, and easy to scale representation( such that
> > > > serializing new fields != DB upgrade).
> > > >
> > > > Do you mean https://json5.org/ or is this a typo? That might be okay
> > for
> > > > a nicer user front end, but the "canonical" version stored in the DB
> > > should
> > > > be something "plainer" like just JSON.
> > > >
> > > > I'm not sure that "serializing new fields != DB upgrade" is that big
> > of a
> > > > concern, as we don't add fields that often. One possible way of
> dealing
> > > > with it if we do is to have a hybrid approach - a few distinct
> columns,
> > > but
> > > > then a JSON blob. (and if we were only to support postgres we could
> > just
> > > > use JSONb. But I think our friends at Google may object ;) )
> > > >
> > > > Adding a new column in a DB migration with a default NULL shouldn't
> be
> > an
> > > > expensive operation, or difficult to achieve.
> > > >
> > > >
> > > > >
> > > > > In my imagination we would have to collect the list of dynamic
> > features
> > > > depending on unserializable fields of a DAG and start a
> discussion/vote
> > > on
> > > > dropping support of them( I'm working on this but if anyone has
> already
> > > > done so please take over), decide on the serialized representation
> of a
> > > DAG
> > > > and then replace dagbag with it in webserver. Per previous discussion
> > and
> > > > some offline discussions with Dan, one future of DAG serialization
> > that I
> > > > like would look similar to this:
> > > > >
> > > >
> > > > > https://imgur.com/ncqqQgc
> > > >
> > > > Something I've thought about before for other things was to embed an
> > API
> > > > server _into_ the scheduler - this would be useful for k8s
> > healthchecks,
> > > > native Prometheus metrics without needed statsd bridge, and could
> have
> > > > endpoints to get information such as this directly.
> > > >
> > > > I was thinking it would be _in_ the scheduler process using either
> > > threads
> > > > (ick. Python's still got a GIL doesn't it?) or using async/twisted
> etc.
> > > > (not a side-car process like we have with the logs webserver for
> > `airflow
> > > > worker`).
> > > >
> > > > (This is possibly an unrelated discussion, but might be worth talking
> > > > about?)
> > > >
> > > > > We can still discuss/vote which approach we want to take but I
> don't
> > > > want the door to above design to be shut right now or we have to
> spend
> > a
> > > > lot effort switch path later.
> > > > >
> > > > > Bas and Peter, I'm very sorry to extend the discussion but I do
> think
> > > > this is tightly related to the AIP and PRs behind it. And my sincere
> > > > apology for bringing this up so late( I only pull the open PR list
> > > > occasionally, if there's a way to subscribe to new PR event I'd love
> to
> > > > know how).
> > > >
> > > > It's noisy, but you can subscribe to commits@airflow.apache.org (but
> > be
> > > > warned, this also includes all Jira tickets, edits of every comment
> on
> > > > github etc.).
> > > >
> > > >
> > > > >
> > > > > Cheers,
> > > > > Kevin Y
> > > > >
> > > > > On Thu, Feb 28, 2019 at 1:36 PM Peter van t Hof <
> > pjrvanthof@gmail.com
> > > > <mailto:pjrvanthof@gmail.com>> wrote:
> > > > > Hi all,
> > > > >
> > > > > Just some comments one the point Bolke dit give in relation of my
> PR.
> > > > >
> > > > > At first, the main focus is: making the webserver stateless.
> > > > >
> > > > > > 1) Make the webserver stateless: needs the graph of the *current*
> > dag
> > > > >
> > > > > This is the main goal but for this a lot more PR’s will be coming
> > once
> > > > my current is merged. For edges and graph view this is covered in my
> PR
> > > > already.
> > > > >
> > > > > > 2) Version dags: for consistency mainly and not requiring parsing
> > of
> > > > the
> > > > > > dag on every loop
> > > > >
> > > > > In my PR the historical graphs will be stored for each DagRun. This
> > > > means that you can see if an older DagRun was the same graph
> structure,
> > > > even if some tasks does not exists anymore in the current graph.
> > > Especially
> > > > for dynamic DAG’s this is very useful.
> > > > >
> > > > > > 3) Make the scheduler not require DAG files. This could be done
> if
> > > the
> > > > > > edges contain all information when to trigger the next task.
We
> can
> > > > then
> > > > > > have event driven dag parsing outside of the scheduler loop,
ie.
> by
> > > the
> > > > > > cli. Storage can also be somewhere else (git, artifactory,
> > > filesystem,
> > > > > > whatever).
> > > > >
> > > > > The scheduler is almost untouched in this PR. The only thing that
> is
> > > > added is that this edges are saved to the database but the scheduling
> > > > itself din’t change. The scheduler depends now still on the DAG
> object.
> > > > >
> > > > > > 4) Fully serialise the dag so it becomes transferable to workers
> > > > >
> > > > > It nice to see that people has a lot of idea’s about this. But
as
> > Fokko
> > > > already mentioned this is out of scope for the issue what we are
> trying
> > > to
> > > > solve. I also have some idea’s about this but I like to limit this
> > PR/AIP
> > > > to the webserver.
> > > > >
> > > > > For now my PR does solve 1 and 2 and the rest of the behaviour
> (like
> > > > scheduling) is untouched.
> > > > >
> > > > > Gr,
> > > > > Peter
> > > > >
> > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message