airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ash Berlin-Taylor <>
Subject Re: [DISCUSS] AIP-12 Persist DAG into DB
Date Thu, 18 Jul 2019 09:46:05 GMT
We didn't reach any conclusion on this yet but I agree, and this is the big task that we at
Astronomer are going to work on next for Airflow.

I've started chatting to a few of the other committers about this to get a an idea of people's
priorities, and have had a chat with Alex at Uber about their experiences of making their
internal fork of Airflow - Piper

I'll create something in the wiki (probably not an AIP to start with) to collect the possible
approaches and downsides/limitations.

Watch this space.


> On 18 Jul 2019, at 07:05, Tao Feng <> wrote:
> Do we reach any consensus on this topic /AIP? I think persisting DAG is
> pretty important actually.
> -Tao
> On Tue, Mar 12, 2019 at 3:01 AM Kevin Yang <> wrote:
>> Hi Fokko,
>> As a large cluster maintainer, I’m not a big fan of large DAG files
>> neither. But I’m not sure if I’ll consider this bad practice. We have some
>> large frameworks, e.g. experimentation and machine learning, that are
>> complex by nature and generate large number of DAGs from their customer
>> configs to get better flexibility. I consider them as advance use cases of
>> Airflow and open up a lot potentials for Airflow, unless we’ve previously
>> set some boundaries around how complex DAG codes can be that I’m not aware
>> of. About resulting in an unworkable situation, yes we are experiencing
>> pain from having such large DAG files, mainly on the webserver side, but
>> the system overall are running stable. We are actually hoping to improve
>> the situation by applying solutions like making webserver stateless. It is
>> ok that if the owners of large DAG files need to pay but we should try
>> minimize the price—longer refresh interval, extra task running time, but
>> nothing too crazy.
>> I think we’re aligned on storing info in DB as long as we can meet the
>> requirements Dan mentioned earlier—we just need that balance decided, so
>> I’m gonna skip this part( out of all the requirements, No.1 seems to be
>> least clear, maybe we can expand on that). One thing about the proposed
>> idea is that we implicitly couple DagRun with DAG version, which at the
>> first glance make sense but imo not very ideal. I feel full versioning
>> should track all changes instead of tracking changes only when we create
>> DagRun. E.g. my task failed and I merged new code to fix my task and I want
>> to rerun it with the current code, with serialize DAG during DagRun
>> creation time we won’t have the up to date snapshot—sure we can work around
>> it by like always keep a current snapshot of DAG but this is kinda messy
>> and confusing. This is what popped up on the top of my head and w/o full
>> versioning we might have some other tricky cases, e.g. ur backfill case.
>> But I just gave a few thoughts into this and you might already have a
>> complete story that will void my concerns.
>> Cheers,
>> Kevin Y
>> On Sun, Mar 10, 2019 at 11:29 AM Driesprong, Fokko <>
>> wrote:
>>> Thanks Kevin for opening the discussion. I think it is important to have
>> a
>>> clear overview on how to approach the AIP.
>>> First of all, how many DAGs do we have that take 30s to parse? I consider
>>> this bad practice, and this would also result in an unworkable situation
>>> with the current setup of Airflow since it will take a lot of resources
>> on
>>> the webserver/scheduler, and the whole system will become unresponsive. I
>>> will be hard to cope with such DAGs in general.
>>> The idea from the AIP is to have the versioned version of the dag in the
>>> DB, so in the end, you won't need to parse the whole thing every time.
>> Only
>>> when you trigger a DAG, or when you want to see the current status of the
>>> dag.
>>> Like stated earlier, I strongly feel we shouldn't serialize the DAGs as
>>> JSON(5) or pickles in general. For me, this is deferring the pain of
>>> setting up a structure of the DAG object itself.
>>> Having the DAG denormalized in the database will give us cleaner storage
>> of
>>> our DAG. We can, for example, enforce fields by making them not null, so
>> we
>>> know that is something is off at write time, instead of read.
>> Furthermore,
>>> we're missing logical types such as dates, which we efficiently can query
>>> using the indices of the database.
>>> Also, with all serialization formats, evolution isn't trivial. Consider
>> the
>>> situations when:
>>> - We're introducing a new field, and it might be null, therefore we need
>> to
>>> bake in all kinds of logic into the Airflow code, which you don't want.
>>> With proper migration scripts, you could prefill these fields, and make
>>> them not null.
>>> - Changing the models, for example, you still can't change a string-type
>>> into a integer with adding custom logic. In this case, the reviewer needs
>>> to be extra careful that there are no breaking changes introduced. Right
>>> now we're doing minimal forward- and backward compatibilitytesting.
>>> In the case we get too many migrations, we could also squash (some of
>> them)
>>> when preparing the release.
>>> Personally, I don't think the serialization is the issue here. As Max
>>> already mentioned, it is the optimal balance of (de)normalization. From
>> the
>>> user perspective, the serialization won't change much of the behaviour of
>>> Airflow.
>>> For me, instead of having `DAG.serialize()` and `DAG.deser(version)` is
>> not
>>> the ideal approach. But it might be that we're on the same page :-) I
>>> believe it should be something like `DagRun.find('fokkos_dag',
>>> datetime(2018, 03, 01))` and construct the correct version of the dag.
>>> Since there is an uniqueness constrain on dag_id, datetime, this will
>>> always return the same dag. You will get the versioned DagRun as it ran
>>> that time. Serializing the fields adn storing them in the database should
>>> happen transparently when you update the DAG object. When you run a dag,
>>> you'll parse the dag, and then run it. `Dag().create_dagrun(...)`, this
>>> will create a DagRun as the name suggests, if the version of the dag
>> still
>>> exists in the database, it will reuse that one, otherwise it will create
>> a
>>> new version of the DAG (with all the operators etc). In this sense the
>>> version of the DAGs should be done within the Dag(Run).
>>> The versioning will change the behavour from a user perspective. Right
>> now
>>> we store only a single version. For example, the poor mans backfilling
>>> won't work anymore. This is clearing the state from past&future, up- and
>>> downstream, and let it catch up again.
>>> In this case, the old version of the DAG won't exists anymore, and
>>> potentially there are tasks that aren't in the code anymore. In this case
>>> we need to clear the version of the dag, and rerun it with the latest
>>> version `DagRun.find('fokkos_dag', datetime(2018, 03, 01)).clear()`. How
>> we
>>> are going to do clear's downstram in the middle of the dag, that is
>>> something I still have to figure out. Because potentially there are tasks
>>> that can't be rerun because the underlying Python code has changed, both
>> on
>>> user level as on Airflow level. It will be impossible to get these
>> features
>>> pure in that sense.
>>> I would not suggest adding a new status in here, indicating that the task
>>> can't be rerun since it isn't part of the DAG anymore. We have to find
>> the
>>> balance here in adding complexity (also to the scheduler) and features
>> that
>>> we need to introduce to help the user.
>>> Cheers, Fokko
>>> Ps. Jarek, interesting idea. It shouldn't be too hard to make Airflow
>> more
>>> k8s native. You could package your dags within your container, and do a
>>> rolling update. Add the DAGs as the last layer, and then point the DAGs
>>> folder to the same location. The hard part here is that you need to
>>> gracefuly restart the workers. Currently AFAIK the signals given to the
>> pod
>>> aren't respected. So when the scheduler/webserver/worker receives a
>>> SIGTERM, it should stop the jobs nicely and then exit the container,
>> before
>>> k8s kills the container using a SIGKILL.  This will be challenging with
>> the
>>> workers, which they are potentially long-running. Maybe stop kicking off
>>> new jobs, and let the old ones finish, will be good enough, but then we
>>> need to increase the standard kill timeout substantially. Having this
>> would
>>> also enable the autoscaling of the workers.
>>> Op za 9 mrt. 2019 om 19:07 schreef Maxime Beauchemin <
>>>> I want to raise the question of the amount of normalization we want to
>>> use
>>>> here as it seems the to be an area that needs more attention.
>>>> The SIP suggest having DAG blobs, task blobs and edges (call it the
>>>> fairly-normalized). I also like the idea of all-encompassing (call it
>>>> very-denormalized) DAG blobs as it seems easier to manage in terms of
>>>> versioning. The question here is whether we go with one of these method
>>>> exclusively, something in-between or even a hybrid approach (redundant
>>>> blobs that use different level of normalization).
>>>> It's nice and simple to just push or pull DAG atomic objects with a
>>> version
>>>> stamp on it. It's clearly simpler than dealing with 3 versioned tables
>>>> (dag, tasks, edges). There are a lot of pros/cons, and they become more
>>>> apparent with the perspective of very large DAGs. If the web server is
>>>> building a "task details page", using the "fairly-normalized" model, it
>>> can
>>>> just pull what it needs instead of pulling the large DAG blob.
>> Similarly,
>>>> if building a sub-tree view (a subset of the DAG), perhaps it can only
>>>> retrieve what it needs. But if you need the whole DAG (say for the
>>>> scheduler use case) then you're dealing with more complex SQL/ORM
>>>> operations (joins hopefully, or multiple db round trips)
>>>> Now maybe the right approach is more something like 2 tables: DAG and
>>>> task_details, where edges keys are denormalized into DAG (arguably
>>> that's a
>>>> few KBs at most, even for large DAGs), and maybe the DAG object has
>> most
>>> of
>>>> the high level task metadata information (operator, name, baseoperator
>>> key
>>>> attrs), and task_details has the big blobs (SQL code). This is
>> probably a
>>>> nice compromise, the question becomes "how much task-level detail do we
>>>> store in the DAG-centric blog?", probably not much to keep the DAG
>>> objects
>>>> as small as possible. The main downside here is that you cannot have
>> the
>>>> database join and have to do 2 round trips to reconstruct a DAG object
>>>> (fetch the DAG, parse the object to get the list of tasks, and then run
>>>> another db query to get those task details).
>>>> To resume, I'd qualify the more normalized approach as the most proper,
>>> but
>>>> also the more complex. It'll shine in specific cases around large DAGs.
>>> If
>>>> we have the proper abstractions (methods like DAG.serialize(),
>>>> DAG.deser(version)) then I guess that's not an issue.
>>>> Max
>>>> On Fri, Mar 8, 2019 at 5:21 PM Kevin Yang <> wrote:
>>>>> Hi Julian, I'm definitely aligned with you guys on making the
>> webserver
>>>>> independent of DAG parsing, just the end goal to me would be to
>> build a
>>>>> complete story around serializing DAG--and move with the story in
>>> mind. I
>>>>> feel like you guys may already have a list of dynamic features we
>> need
>>> to
>>>>> deprecate/change, if that is the case feel free to open the
>> discussion
>>> on
>>>>> what we do to them with DAG serialization.
>>>>> Julian, Ash, Dan, on 2nd thought I do agree that if we can meet the
>>>>> requirements Dan mentioned, it would be nice to have them stored in
>> the
>>>> DB.
>>>>> Some combined solutions like having a column of serialized graph in
>> the
>>>>> serialized dag table can potentially meet all requirements. What
>> format
>>>> we
>>>>> end up using to represent DAG between components is now less
>> important
>>>>> IMO--fine to refactor those endpoints only need DagModel to use only
>>>>> DagModel, easy to do a batch replacement if we decide otherwise
>> later.
>>>> More
>>>>> important is to define this source of truth for serialized DAG.
>>>>> Ash, ty for the email list, I'll tune my filters accordingly :D I'm
>>>> leaning
>>>>> towards having a separate process for the parser so we got no
>> scheduler
>>>>> dependency etc for this parser but we can discuss this in another
>>> thread.
>>>>> On Fri, Mar 8, 2019 at 8:57 AM Dan Davydov
>>> <
>>>>> wrote:
>>>>>>> Personally I don’t understand why people are pushing for a
>>> JSON-based
>>>>> DAG
>>>>>>> representation
>>>>>> It sounds like you agree that DAGs should be serialized (just in
>> the
>>> DB
>>>>>> instead of JSON), so will only address why JSON is better than
>> MySQL
>>>> (AKA
>>>>>> serializing at the DAG level vs the task level) as far as I can
>> see,
>>>> and
>>>>>> not why we need serialization. If you zoom out and look at all the
>>> use
>>>>>> cases of serialized DAGs, e.g. having the scheduler use them
>> instead
>>> of
>>>>>> parsing DAGs directly, then it becomes clear that we need all
>>>> appropriate
>>>>>> metadata in these DAGs, (operator params, DAG properties, etc), in
>>>> which
>>>>>> case it's not clear how it will fit nicely into a DB table (unless
>>> you
>>>>>> wanted to do something like (parent_task_id, task_id, task_params),
>>>> also
>>>>>> keep in mind that we will need to store different versions of each
>>> DAG
>>>> in
>>>>>> the future so that we can ensure consistency in a dagrun, i.e. each
>>>> task
>>>>> in
>>>>>> a dagrun uses the same version of a DAG.
>>>>>> I think some of our requirements should be:
>>>>>> 1. The data model will lead to acceptable performance in all of its
>>>>>> consumers (scheduler, webserver, workers), i.e. no n+1 access
>>> patterns
>>>>> (my
>>>>>> biggest concern about serializing at task level as you propose vs
>> at
>>>> DAG
>>>>>> level)
>>>>>> 2. We can have versioning of serialized DAGs
>>>>>> 3. The ability to separate DAGs into their own data store (e.g. no
>>>>> reliance
>>>>>> on joins between the new table and the old one)
>>>>>> 4. One source of truth/serialized representation for DAGs
>> (currently
>>> we
>>>>>> have SimpleDAG)
>>>>>> If we can full-fill all of these requirements and serialize at the
>>> task
>>>>>> level rather than the DAG level in the DB, then I agree that
>> probably
>>>>> makes
>>>>>> more sense.
>>>>>>> In the proposed PR’s we (Peter, Bas and me) aim to avoid
>> re-parsing
>>>> DAG
>>>>>>> files by querying all the required information from the database.
>>> In
>>>>> one
>>>>>> or
>>>>>>> two cases this may however not be possible, in which case we
>> might
>>>>> either
>>>>>>> have to fall back on the DAG file or add the missing information
>>> into
>>>>> the
>>>>>>> database. We can tackle these problems as we encounter them.
>>>>>> I think you would have the support of many of committers in
>> removing
>>>> any
>>>>>> use-cases that stand in the way of full serialization, that being
>>> said
>>>> if
>>>>>> we need to remove features we need to do this carefully and
>>>> thoughtfully,
>>>>>> and ideally with proposed alternatives/work-arounds to cover the
>>>>> removals.
>>>>>> The counter argument: this PR removes the need for the confusing
>>>>> "Refresh"
>>>>>>> button from the UI, and in general you only pay the cost for
>>>>>> expensive
>>>>>>> DAGs when you ask about them. (I don't know what/when we call
>>>>>>> /pickle_info endpoint of the top of my head)
>>>>>> Probably worth splitting out into a separate thread, but I'm
>> actually
>>>> not
>>>>>> sure the refresh button does anything, I think we should double
>>>> check...
>>>>> I
>>>>>> think about 2 years ago there was a commit made that made gunicorn
>>>>>> webservers automatically rotate underneath flask (each one would
>>>> reparse
>>>>>> the DAGbag). Even if it works we should probably remove it since
>> the
>>>>>> webserver refresh interval is pretty fast, and it just causes
>>> confusion
>>>>> to
>>>>>> users and implies that the DAGs are not refreshed automatically.
>>>>>> Do you mean or is this a typo? That might be
>> okay
>>>>> for a
>>>>>>> nicer user front end, but the "canonical" version stored in the
>> DB
>>>>> should
>>>>>>> be something "plainer" like just JSON.
>>>>>> I think he got this from my reply, and it was just an example, but
>>> you
>>>>> are
>>>>>> right, I agree JSON would be better than JSON5.
>>>>>> On Fri, Mar 8, 2019 at 8:53 AM Ash Berlin-Taylor <>
>>>> wrote:
>>>>>>> Comments inline.
>>>>>>>> On 8 Mar 2019, at 11:28, Kevin Yang <>
>>>>>>>> Hi all,
>>>>>>>> When I was preparing some work related to this AIP I found
>>>> something
>>>>>>> very concerning. I noticed this JIRA ticket <
>>>>>>>> is trying
>>>>> remove
>>>>>>> the dependency of dagbag from webserver, which is awesome--we
>>> wanted
>>>>>> badly
>>>>>>> but never got to start work on. However when I looked at some
>>>> subtasks
>>>>> of
>>>>>>> it, which try to remove dagbag dependency from each endpoint,
>>> found
>>>>> the
>>>>>>> way we remove the dependency of dagbag is not very ideal. For
>>> example
>>>>>> this
>>>>>>> PR <>
>>> require
>>>> us
>>>>>> to
>>>>>>> parse the dag file each time we hit the endpoint.
>>>>>>> The counter argument: this PR removes the need for the confusing
>>>>>> "Refresh"
>>>>>>> button from the UI, and in general you only pay the cost for
>>>>>> expensive
>>>>>>> DAGs when you ask about them. (I don't know what/when we call
>>>>>>> /pickle_info endpoint of the top of my head)
>>>>>>> This end point may be one to hold off on (as it can ask for
>>> multiple
>>>>>> dags)
>>>>>>> but there are some that def don't need a full dag bag or to even
>>>> parse
>>>>>> the
>>>>>>> dag file, the current DAG model has enough info.
>>>>>>>> If we go down this path, we indeed can get rid of the dagbag
>>>>> dependency
>>>>>>> easily, but we will have to 1. increase the DB load( not too
>>>> concerning
>>>>>> at
>>>>>>> the moment ), 2. wait the DAG file to be parsed before getting
>> the
>>>> page
>>>>>>> back, potentially multiple times. DAG file can sometimes take
>>> quite a
>>>>>> while
>>>>>>> to parse, e.g. we have some framework DAG files generating large
>>>> number
>>>>>> of
>>>>>>> DAGs from some static config files or even jupyter notebooks
>>> they
>>>>> can
>>>>>>> take 30+ seconds to parse. Yes we don't like large DAG files
>>>> people
>>>>>> do
>>>>>>> see the beauty of code as config and sometimes heavily
>>> abuseleverage
>>>>> it.
>>>>>>> Assuming all users have the same nice small python file that
>> be
>>>>>> parsed
>>>>>>> fast, I'm still a bit worried about this approach. Continuing
>>> this
>>>>>> path
>>>>>>> means we've chosen DagModel to be the serialized representation
>> of
>>>> DAG
>>>>>> and
>>>>>>> DB columns to hold different properties--it can be one candidate
>>> but
>>>> I
>>>>>>> don't know if we should settle on that now. I would personally
>>>> prefer a
>>>>>>> more compact, e.g. JSON5, and easy to scale representation( such
>>> that
>>>>>>> serializing new fields != DB upgrade).
>>>>>>> Do you mean or is this a typo? That might
>>> okay
>>>>> for
>>>>>>> a nicer user front end, but the "canonical" version stored in
>>> DB
>>>>>> should
>>>>>>> be something "plainer" like just JSON.
>>>>>>> I'm not sure that "serializing new fields != DB upgrade" is that
>>> big
>>>>> of a
>>>>>>> concern, as we don't add fields that often. One possible way
>>>> dealing
>>>>>>> with it if we do is to have a hybrid approach - a few distinct
>>>> columns,
>>>>>> but
>>>>>>> then a JSON blob. (and if we were only to support postgres we
>> could
>>>>> just
>>>>>>> use JSONb. But I think our friends at Google may object ;) )
>>>>>>> Adding a new column in a DB migration with a default NULL
>> shouldn't
>>>> be
>>>>> an
>>>>>>> expensive operation, or difficult to achieve.
>>>>>>>> In my imagination we would have to collect the list of dynamic
>>>>> features
>>>>>>> depending on unserializable fields of a DAG and start a
>>>> discussion/vote
>>>>>> on
>>>>>>> dropping support of them( I'm working on this but if anyone has
>>>> already
>>>>>>> done so please take over), decide on the serialized
>> representation
>>>> of a
>>>>>> DAG
>>>>>>> and then replace dagbag with it in webserver. Per previous
>>> discussion
>>>>> and
>>>>>>> some offline discussions with Dan, one future of DAG
>> serialization
>>>>> that I
>>>>>>> like would look similar to this:
>>>>>>> Something I've thought about before for other things was to embed
>>> an
>>>>> API
>>>>>>> server _into_ the scheduler - this would be useful for k8s
>>>>> healthchecks,
>>>>>>> native Prometheus metrics without needed statsd bridge, and could
>>>> have
>>>>>>> endpoints to get information such as this directly.
>>>>>>> I was thinking it would be _in_ the scheduler process using
>> either
>>>>>> threads
>>>>>>> (ick. Python's still got a GIL doesn't it?) or using
>> async/twisted
>>>> etc.
>>>>>>> (not a side-car process like we have with the logs webserver
>>>>> `airflow
>>>>>>> worker`).
>>>>>>> (This is possibly an unrelated discussion, but might be worth
>>> talking
>>>>>>> about?)
>>>>>>>> We can still discuss/vote which approach we want to take
but I
>>>> don't
>>>>>>> want the door to above design to be shut right now or we have
>>>> spend
>>>>> a
>>>>>>> lot effort switch path later.
>>>>>>>> Bas and Peter, I'm very sorry to extend the discussion but
I do
>>>> think
>>>>>>> this is tightly related to the AIP and PRs behind it. And my
>>> sincere
>>>>>>> apology for bringing this up so late( I only pull the open PR
>> list
>>>>>>> occasionally, if there's a way to subscribe to new PR event I'd
>>> love
>>>> to
>>>>>>> know how).
>>>>>>> It's noisy, but you can subscribe to
>>> (but
>>>>> be
>>>>>>> warned, this also includes all Jira tickets, edits of every
>> comment
>>>> on
>>>>>>> github etc.).
>>>>>>>> Cheers,
>>>>>>>> Kevin Y
>>>>>>>> On Thu, Feb 28, 2019 at 1:36 PM Peter van t Hof <
>>>>>>> <>> wrote:
>>>>>>>> Hi all,
>>>>>>>> Just some comments one the point Bolke dit give in relation
>> my
>>>> PR.
>>>>>>>> At first, the main focus is: making the webserver stateless.
>>>>>>>>> 1) Make the webserver stateless: needs the graph of the
>>> *current*
>>>>> dag
>>>>>>>> This is the main goal but for this a lot more PR’s will
>> coming
>>>>> once
>>>>>>> my current is merged. For edges and graph view this is covered
>>> my
>>>> PR
>>>>>>> already.
>>>>>>>>> 2) Version dags: for consistency mainly and not requiring
>>> parsing
>>>>> of
>>>>>>> the
>>>>>>>>> dag on every loop
>>>>>>>> In my PR the historical graphs will be stored for each DagRun.
>>> This
>>>>>>> means that you can see if an older DagRun was the same graph
>>>> structure,
>>>>>>> even if some tasks does not exists anymore in the current graph.
>>>>>> Especially
>>>>>>> for dynamic DAG’s this is very useful.
>>>>>>>>> 3) Make the scheduler not require DAG files. This could
>> done
>>>> if
>>>>>> the
>>>>>>>>> edges contain all information when to trigger the next
>> We
>>>> can
>>>>>>> then
>>>>>>>>> have event driven dag parsing outside of the scheduler
>>> ie.
>>>> by
>>>>>> the
>>>>>>>>> cli. Storage can also be somewhere else (git, artifactory,
>>>>>> filesystem,
>>>>>>>>> whatever).
>>>>>>>> The scheduler is almost untouched in this PR. The only thing
>> that
>>>> is
>>>>>>> added is that this edges are saved to the database but the
>>> scheduling
>>>>>>> itself din’t change. The scheduler depends now still on the
>>>> object.
>>>>>>>>> 4) Fully serialise the dag so it becomes transferable
>>> workers
>>>>>>>> It nice to see that people has a lot of idea’s about this.
>> as
>>>>> Fokko
>>>>>>> already mentioned this is out of scope for the issue what we
>>>> trying
>>>>>> to
>>>>>>> solve. I also have some idea’s about this but I like to limit
>> this
>>>>> PR/AIP
>>>>>>> to the webserver.
>>>>>>>> For now my PR does solve 1 and 2 and the rest of the behaviour
>>>> (like
>>>>>>> scheduling) is untouched.
>>>>>>>> Gr,
>>>>>>>> Peter

View raw message