airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Silk <gs...@dropbox.com.INVALID>
Subject Re: [DISCUSS] AIP-12 Persist DAG into DB
Date Wed, 24 Jul 2019 17:36:32 GMT
I'm really excited about this feature, and I'd love to be able to provide
feedback on the proposed design.

On Thu, Jul 18, 2019 at 10:21 AM Tao Feng <fengtao04@gmail.com> wrote:

> Thanks Ash. This will be huge!
>
> On Thu, Jul 18, 2019 at 4:00 AM Jarek Potiuk <Jarek.Potiuk@polidea.com>
> wrote:
>
> > Cool!
> >
> > On Thu, Jul 18, 2019 at 11:46 AM Ash Berlin-Taylor <ash@apache.org>
> wrote:
> >
> > > We didn't reach any conclusion on this yet but I agree, and this is the
> > > big task that we at Astronomer are going to work on next for Airflow.
> > >
> > > I've started chatting to a few of the other committers about this to
> get
> > a
> > > an idea of people's priorities, and have had a chat with Alex at Uber
> > about
> > > their experiences of making their internal fork of Airflow - Piper
> > > https://eng.uber.com/managing-data-workflows-at-scale/
> > >
> > > I'll create something in the wiki (probably not an AIP to start with)
> to
> > > collect the possible approaches and downsides/limitations.
> > >
> > > Watch this space.
> > >
> > > -ash
> > >
> > > > On 18 Jul 2019, at 07:05, Tao Feng <fengtao04@gmail.com> wrote:
> > > >
> > > > Do we reach any consensus on this topic /AIP? I think persisting DAG
> is
> > > > pretty important actually.
> > > >
> > > > -Tao
> > > >
> > > > On Tue, Mar 12, 2019 at 3:01 AM Kevin Yang <yrqls21@gmail.com>
> wrote:
> > > >
> > > >> Hi Fokko,
> > > >>
> > > >> As a large cluster maintainer, I’m not a big fan of large DAG files
> > > >> neither. But I’m not sure if I’ll consider this bad practice.
We
> have
> > > some
> > > >> large frameworks, e.g. experimentation and machine learning, that
> are
> > > >> complex by nature and generate large number of DAGs from their
> > customer
> > > >> configs to get better flexibility. I consider them as advance use
> > cases
> > > of
> > > >> Airflow and open up a lot potentials for Airflow, unless we’ve
> > > previously
> > > >> set some boundaries around how complex DAG codes can be that I’m
not
> > > aware
> > > >> of. About resulting in an unworkable situation, yes we are
> > experiencing
> > > >> pain from having such large DAG files, mainly on the webserver side,
> > but
> > > >> the system overall are running stable. We are actually hoping to
> > improve
> > > >> the situation by applying solutions like making webserver stateless.
> > It
> > > is
> > > >> ok that if the owners of large DAG files need to pay but we should
> try
> > > >> minimize the price—longer refresh interval, extra task running time,
> > but
> > > >> nothing too crazy.
> > > >>
> > > >>
> > > >> I think we’re aligned on storing info in DB as long as we can meet
> the
> > > >> requirements Dan mentioned earlier—we just need that balance
> decided,
> > so
> > > >> I’m gonna skip this part( out of all the requirements, No.1 seems
to
> > be
> > > >> least clear, maybe we can expand on that). One thing about the
> > proposed
> > > >> idea is that we implicitly couple DagRun with DAG version, which at
> > the
> > > >> first glance make sense but imo not very ideal. I feel full
> versioning
> > > >> should track all changes instead of tracking changes only when we
> > create
> > > >> DagRun. E.g. my task failed and I merged new code to fix my task
> and I
> > > want
> > > >> to rerun it with the current code, with serialize DAG during DagRun
> > > >> creation time we won’t have the up to date snapshot—sure we can
work
> > > around
> > > >> it by like always keep a current snapshot of DAG but this is kinda
> > messy
> > > >> and confusing. This is what popped up on the top of my head and w/o
> > full
> > > >> versioning we might have some other tricky cases, e.g. ur backfill
> > case.
> > > >> But I just gave a few thoughts into this and you might already have
> a
> > > >> complete story that will void my concerns.
> > > >>
> > > >>
> > > >> Cheers,
> > > >> Kevin Y
> > > >>
> > > >> On Sun, Mar 10, 2019 at 11:29 AM Driesprong, Fokko
> > <fokko@driesprong.frl
> > > >
> > > >> wrote:
> > > >>
> > > >>> Thanks Kevin for opening the discussion. I think it is important
to
> > > have
> > > >> a
> > > >>> clear overview on how to approach the AIP.
> > > >>>
> > > >>> First of all, how many DAGs do we have that take 30s to parse?
I
> > > consider
> > > >>> this bad practice, and this would also result in an unworkable
> > > situation
> > > >>> with the current setup of Airflow since it will take a lot of
> > resources
> > > >> on
> > > >>> the webserver/scheduler, and the whole system will become
> > > unresponsive. I
> > > >>> will be hard to cope with such DAGs in general.
> > > >>>
> > > >>> The idea from the AIP is to have the versioned version of the
dag
> in
> > > the
> > > >>> DB, so in the end, you won't need to parse the whole thing every
> > time.
> > > >> Only
> > > >>> when you trigger a DAG, or when you want to see the current status
> of
> > > the
> > > >>> dag.
> > > >>>
> > > >>> Like stated earlier, I strongly feel we shouldn't serialize the
> DAGs
> > as
> > > >>> JSON(5) or pickles in general. For me, this is deferring the pain
> of
> > > >>> setting up a structure of the DAG object itself.
> > > >>> Having the DAG denormalized in the database will give us cleaner
> > > storage
> > > >> of
> > > >>> our DAG. We can, for example, enforce fields by making them not
> null,
> > > so
> > > >> we
> > > >>> know that is something is off at write time, instead of read.
> > > >> Furthermore,
> > > >>> we're missing logical types such as dates, which we efficiently
can
> > > query
> > > >>> using the indices of the database.
> > > >>> Also, with all serialization formats, evolution isn't trivial.
> > Consider
> > > >> the
> > > >>> situations when:
> > > >>> - We're introducing a new field, and it might be null, therefore
we
> > > need
> > > >> to
> > > >>> bake in all kinds of logic into the Airflow code, which you don't
> > want.
> > > >>> With proper migration scripts, you could prefill these fields,
and
> > make
> > > >>> them not null.
> > > >>> - Changing the models, for example, you still can't change a
> > > string-type
> > > >>> into a integer with adding custom logic. In this case, the reviewer
> > > needs
> > > >>> to be extra careful that there are no breaking changes introduced.
> > > Right
> > > >>> now we're doing minimal forward- and backward compatibilitytesting.
> > > >>>
> > > >>> In the case we get too many migrations, we could also squash (some
> of
> > > >> them)
> > > >>> when preparing the release.
> > > >>>
> > > >>> Personally, I don't think the serialization is the issue here.
As
> Max
> > > >>> already mentioned, it is the optimal balance of (de)normalization.
> > From
> > > >> the
> > > >>> user perspective, the serialization won't change much of the
> > behaviour
> > > of
> > > >>> Airflow.
> > > >>>
> > > >>> For me, instead of having `DAG.serialize()` and
> `DAG.deser(version)`
> > is
> > > >> not
> > > >>> the ideal approach. But it might be that we're on the same page
> :-) I
> > > >>> believe it should be something like `DagRun.find('fokkos_dag',
> > > >>> datetime(2018, 03, 01))` and construct the correct version of
the
> > dag.
> > > >>> Since there is an uniqueness constrain on dag_id, datetime, this
> will
> > > >>> always return the same dag. You will get the versioned DagRun
as it
> > ran
> > > >>> that time. Serializing the fields adn storing them in the database
> > > should
> > > >>> happen transparently when you update the DAG object. When you
run a
> > > dag,
> > > >>> you'll parse the dag, and then run it. `Dag().create_dagrun(...)`,
> > this
> > > >>> will create a DagRun as the name suggests, if the version of the
> dag
> > > >> still
> > > >>> exists in the database, it will reuse that one, otherwise it will
> > > create
> > > >> a
> > > >>> new version of the DAG (with all the operators etc). In this sense
> > the
> > > >>> version of the DAGs should be done within the Dag(Run).
> > > >>>
> > > >>> The versioning will change the behavour from a user perspective.
> > Right
> > > >> now
> > > >>> we store only a single version. For example, the poor mans
> > backfilling
> > > >>> won't work anymore. This is clearing the state from past&future,
> up-
> > > and
> > > >>> downstream, and let it catch up again.
> > > >>> In this case, the old version of the DAG won't exists anymore,
and
> > > >>> potentially there are tasks that aren't in the code anymore. In
> this
> > > case
> > > >>> we need to clear the version of the dag, and rerun it with the
> latest
> > > >>> version `DagRun.find('fokkos_dag', datetime(2018, 03,
> 01)).clear()`.
> > > How
> > > >> we
> > > >>> are going to do clear's downstram in the middle of the dag, that
is
> > > >>> something I still have to figure out. Because potentially there
are
> > > tasks
> > > >>> that can't be rerun because the underlying Python code has changed,
> > > both
> > > >> on
> > > >>> user level as on Airflow level. It will be impossible to get these
> > > >> features
> > > >>> pure in that sense.
> > > >>> I would not suggest adding a new status in here, indicating that
> the
> > > task
> > > >>> can't be rerun since it isn't part of the DAG anymore. We have
to
> > find
> > > >> the
> > > >>> balance here in adding complexity (also to the scheduler) and
> > features
> > > >> that
> > > >>> we need to introduce to help the user.
> > > >>>
> > > >>> Cheers, Fokko
> > > >>>
> > > >>> Ps. Jarek, interesting idea. It shouldn't be too hard to make
> Airflow
> > > >> more
> > > >>> k8s native. You could package your dags within your container,
and
> > do a
> > > >>> rolling update. Add the DAGs as the last layer, and then point
the
> > DAGs
> > > >>> folder to the same location. The hard part here is that you need
to
> > > >>> gracefuly restart the workers. Currently AFAIK the signals given
to
> > the
> > > >> pod
> > > >>> aren't respected. So when the scheduler/webserver/worker receives
a
> > > >>> SIGTERM, it should stop the jobs nicely and then exit the
> container,
> > > >> before
> > > >>> k8s kills the container using a SIGKILL.  This will be challenging
> > with
> > > >> the
> > > >>> workers, which they are potentially long-running. Maybe stop
> kicking
> > > off
> > > >>> new jobs, and let the old ones finish, will be good enough, but
> then
> > we
> > > >>> need to increase the standard kill timeout substantially. Having
> this
> > > >> would
> > > >>> also enable the autoscaling of the workers.
> > > >>>
> > > >>>
> > > >>>
> > > >>> Op za 9 mrt. 2019 om 19:07 schreef Maxime Beauchemin <
> > > >>> maximebeauchemin@gmail.com>:
> > > >>>
> > > >>>> I want to raise the question of the amount of normalization
we
> want
> > to
> > > >>> use
> > > >>>> here as it seems the to be an area that needs more attention.
> > > >>>>
> > > >>>> The SIP suggest having DAG blobs, task blobs and edges (call
it
> the
> > > >>>> fairly-normalized). I also like the idea of all-encompassing
(call
> > it
> > > >>>> very-denormalized) DAG blobs as it seems easier to manage
in terms
> > of
> > > >>>> versioning. The question here is whether we go with one of
these
> > > method
> > > >>>> exclusively, something in-between or even a hybrid approach
> > (redundant
> > > >>>> blobs that use different level of normalization).
> > > >>>>
> > > >>>> It's nice and simple to just push or pull DAG atomic objects
with
> a
> > > >>> version
> > > >>>> stamp on it. It's clearly simpler than dealing with 3 versioned
> > tables
> > > >>>> (dag, tasks, edges). There are a lot of pros/cons, and they
become
> > > more
> > > >>>> apparent with the perspective of very large DAGs. If the web
> server
> > is
> > > >>>> building a "task details page", using the "fairly-normalized"
> model,
> > > it
> > > >>> can
> > > >>>> just pull what it needs instead of pulling the large DAG blob.
> > > >> Similarly,
> > > >>>> if building a sub-tree view (a subset of the DAG), perhaps
it can
> > only
> > > >>>> retrieve what it needs. But if you need the whole DAG (say
for the
> > > >>>> scheduler use case) then you're dealing with more complex
SQL/ORM
> > > >>>> operations (joins hopefully, or multiple db round trips)
> > > >>>>
> > > >>>> Now maybe the right approach is more something like 2 tables:
DAG
> > and
> > > >>>> task_details, where edges keys are denormalized into DAG (arguably
> > > >>> that's a
> > > >>>> few KBs at most, even for large DAGs), and maybe the DAG object
> has
> > > >> most
> > > >>> of
> > > >>>> the high level task metadata information (operator, name,
> > baseoperator
> > > >>> key
> > > >>>> attrs), and task_details has the big blobs (SQL code). This
is
> > > >> probably a
> > > >>>> nice compromise, the question becomes "how much task-level
detail
> do
> > > we
> > > >>>> store in the DAG-centric blog?", probably not much to keep
the DAG
> > > >>> objects
> > > >>>> as small as possible. The main downside here is that you cannot
> have
> > > >> the
> > > >>>> database join and have to do 2 round trips to reconstruct
a DAG
> > object
> > > >>>> (fetch the DAG, parse the object to get the list of tasks,
and
> then
> > > run
> > > >>>> another db query to get those task details).
> > > >>>>
> > > >>>> To resume, I'd qualify the more normalized approach as the
most
> > > proper,
> > > >>> but
> > > >>>> also the more complex. It'll shine in specific cases around
large
> > > DAGs.
> > > >>> If
> > > >>>> we have the proper abstractions (methods like DAG.serialize(),
> > > >>>> DAG.deser(version)) then I guess that's not an issue.
> > > >>>>
> > > >>>> Max
> > > >>>>
> > > >>>> On Fri, Mar 8, 2019 at 5:21 PM Kevin Yang <yrqls21@gmail.com>
> > wrote:
> > > >>>>
> > > >>>>> Hi Julian, I'm definitely aligned with you guys on making
the
> > > >> webserver
> > > >>>>> independent of DAG parsing, just the end goal to me would
be to
> > > >> build a
> > > >>>>> complete story around serializing DAG--and move with the
story in
> > > >>> mind. I
> > > >>>>> feel like you guys may already have a list of dynamic
features we
> > > >> need
> > > >>> to
> > > >>>>> deprecate/change, if that is the case feel free to open
the
> > > >> discussion
> > > >>> on
> > > >>>>> what we do to them with DAG serialization.
> > > >>>>>
> > > >>>>> Julian, Ash, Dan, on 2nd thought I do agree that if we
can meet
> the
> > > >>>>> requirements Dan mentioned, it would be nice to have them
stored
> in
> > > >> the
> > > >>>> DB.
> > > >>>>> Some combined solutions like having a column of serialized
graph
> in
> > > >> the
> > > >>>>> serialized dag table can potentially meet all requirements.
What
> > > >> format
> > > >>>> we
> > > >>>>> end up using to represent DAG between components is now
less
> > > >> important
> > > >>>>> IMO--fine to refactor those endpoints only need DagModel
to use
> > only
> > > >>>>> DagModel, easy to do a batch replacement if we decide
otherwise
> > > >> later.
> > > >>>> More
> > > >>>>> important is to define this source of truth for serialized
DAG.
> > > >>>>>
> > > >>>>> Ash, ty for the email list, I'll tune my filters accordingly
:D
> I'm
> > > >>>> leaning
> > > >>>>> towards having a separate process for the parser so we
got no
> > > >> scheduler
> > > >>>>> dependency etc for this parser but we can discuss this
in another
> > > >>> thread.
> > > >>>>>
> > > >>>>> On Fri, Mar 8, 2019 at 8:57 AM Dan Davydov
> > > >>> <ddavydov@twitter.com.invalid
> > > >>>>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>>>
> > > >>>>>>> Personally I don’t understand why people are
pushing for a
> > > >>> JSON-based
> > > >>>>> DAG
> > > >>>>>>> representation
> > > >>>>>>
> > > >>>>>> It sounds like you agree that DAGs should be serialized
(just in
> > > >> the
> > > >>> DB
> > > >>>>>> instead of JSON), so will only address why JSON is
better than
> > > >> MySQL
> > > >>>> (AKA
> > > >>>>>> serializing at the DAG level vs the task level) as
far as I can
> > > >> see,
> > > >>>> and
> > > >>>>>> not why we need serialization. If you zoom out and
look at all
> the
> > > >>> use
> > > >>>>>> cases of serialized DAGs, e.g. having the scheduler
use them
> > > >> instead
> > > >>> of
> > > >>>>>> parsing DAGs directly, then it becomes clear that
we need all
> > > >>>> appropriate
> > > >>>>>> metadata in these DAGs, (operator params, DAG properties,
etc),
> in
> > > >>>> which
> > > >>>>>> case it's not clear how it will fit nicely into a
DB table
> (unless
> > > >>> you
> > > >>>>>> wanted to do something like (parent_task_id, task_id,
> > task_params),
> > > >>>> also
> > > >>>>>> keep in mind that we will need to store different
versions of
> each
> > > >>> DAG
> > > >>>> in
> > > >>>>>> the future so that we can ensure consistency in a
dagrun, i.e.
> > each
> > > >>>> task
> > > >>>>> in
> > > >>>>>> a dagrun uses the same version of a DAG.
> > > >>>>>>
> > > >>>>>> I think some of our requirements should be:
> > > >>>>>> 1. The data model will lead to acceptable performance
in all of
> > its
> > > >>>>>> consumers (scheduler, webserver, workers), i.e. no
n+1 access
> > > >>> patterns
> > > >>>>> (my
> > > >>>>>> biggest concern about serializing at task level as
you propose
> vs
> > > >> at
> > > >>>> DAG
> > > >>>>>> level)
> > > >>>>>> 2. We can have versioning of serialized DAGs
> > > >>>>>> 3. The ability to separate DAGs into their own data
store (e.g.
> no
> > > >>>>> reliance
> > > >>>>>> on joins between the new table and the old one)
> > > >>>>>> 4. One source of truth/serialized representation for
DAGs
> > > >> (currently
> > > >>> we
> > > >>>>>> have SimpleDAG)
> > > >>>>>>
> > > >>>>>> If we can full-fill all of these requirements and
serialize at
> the
> > > >>> task
> > > >>>>>> level rather than the DAG level in the DB, then I
agree that
> > > >> probably
> > > >>>>> makes
> > > >>>>>> more sense.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>> In the proposed PR’s we (Peter, Bas and me)
aim to avoid
> > > >> re-parsing
> > > >>>> DAG
> > > >>>>>>> files by querying all the required information
from the
> database.
> > > >>> In
> > > >>>>> one
> > > >>>>>> or
> > > >>>>>>> two cases this may however not be possible, in
which case we
> > > >> might
> > > >>>>> either
> > > >>>>>>> have to fall back on the DAG file or add the missing
> information
> > > >>> into
> > > >>>>> the
> > > >>>>>>> database. We can tackle these problems as we encounter
them.
> > > >>>>>>
> > > >>>>>> I think you would have the support of many of committers
in
> > > >> removing
> > > >>>> any
> > > >>>>>> use-cases that stand in the way of full serialization,
that
> being
> > > >>> said
> > > >>>> if
> > > >>>>>> we need to remove features we need to do this carefully
and
> > > >>>> thoughtfully,
> > > >>>>>> and ideally with proposed alternatives/work-arounds
to cover the
> > > >>>>> removals.
> > > >>>>>>
> > > >>>>>> The counter argument: this PR removes the need for
the confusing
> > > >>>>> "Refresh"
> > > >>>>>>> button from the UI, and in general you only pay
the cost for
> the
> > > >>>>>> expensive
> > > >>>>>>> DAGs when you ask about them. (I don't know what/when
we call
> the
> > > >>>>>>> /pickle_info endpoint of the top of my head)
> > > >>>>>>
> > > >>>>>> Probably worth splitting out into a separate thread,
but I'm
> > > >> actually
> > > >>>> not
> > > >>>>>> sure the refresh button does anything, I think we
should double
> > > >>>> check...
> > > >>>>> I
> > > >>>>>> think about 2 years ago there was a commit made that
made
> gunicorn
> > > >>>>>> webservers automatically rotate underneath flask (each
one would
> > > >>>> reparse
> > > >>>>>> the DAGbag). Even if it works we should probably remove
it since
> > > >> the
> > > >>>>>> webserver refresh interval is pretty fast, and it
just causes
> > > >>> confusion
> > > >>>>> to
> > > >>>>>> users and implies that the DAGs are not refreshed
automatically.
> > > >>>>>>
> > > >>>>>> Do you mean https://json5.org/ or is this a typo?
That might be
> > > >> okay
> > > >>>>> for a
> > > >>>>>>> nicer user front end, but the "canonical" version
stored in the
> > > >> DB
> > > >>>>> should
> > > >>>>>>> be something "plainer" like just JSON.
> > > >>>>>>
> > > >>>>>> I think he got this from my reply, and it was just
an example,
> but
> > > >>> you
> > > >>>>> are
> > > >>>>>> right, I agree JSON would be better than JSON5.
> > > >>>>>>
> > > >>>>>> On Fri, Mar 8, 2019 at 8:53 AM Ash Berlin-Taylor <
> ash@apache.org>
> > > >>>> wrote:
> > > >>>>>>
> > > >>>>>>> Comments inline.
> > > >>>>>>>
> > > >>>>>>>> On 8 Mar 2019, at 11:28, Kevin Yang <yrqls21@gmail.com>
> wrote:
> > > >>>>>>>>
> > > >>>>>>>> Hi all,
> > > >>>>>>>> When I was preparing some work related to
this AIP I found
> > > >>>> something
> > > >>>>>>> very concerning. I noticed this JIRA ticket <
> > > >>>>>>> https://issues.apache.org/jira/browse/AIRFLOW-3562>
is trying
> to
> > > >>>>> remove
> > > >>>>>>> the dependency of dagbag from webserver, which
is awesome--we
> > > >>> wanted
> > > >>>>>> badly
> > > >>>>>>> but never got to start work on. However when I
looked at some
> > > >>>> subtasks
> > > >>>>> of
> > > >>>>>>> it, which try to remove dagbag dependency from
each endpoint, I
> > > >>> found
> > > >>>>> the
> > > >>>>>>> way we remove the dependency of dagbag is not
very ideal. For
> > > >>> example
> > > >>>>>> this
> > > >>>>>>> PR <https://github.com/apache/airflow/pull/4867/files>
will
> > > >>> require
> > > >>>> us
> > > >>>>>> to
> > > >>>>>>> parse the dag file each time we hit the endpoint.
> > > >>>>>>>
> > > >>>>>>> The counter argument: this PR removes the need
for the
> confusing
> > > >>>>>> "Refresh"
> > > >>>>>>> button from the UI, and in general you only pay
the cost for
> the
> > > >>>>>> expensive
> > > >>>>>>> DAGs when you ask about them. (I don't know what/when
we call
> the
> > > >>>>>>> /pickle_info endpoint of the top of my head)
> > > >>>>>>>
> > > >>>>>>> This end point may be one to hold off on (as it
can ask for
> > > >>> multiple
> > > >>>>>> dags)
> > > >>>>>>> but there are some that def don't need a full
dag bag or to
> even
> > > >>>> parse
> > > >>>>>> the
> > > >>>>>>> dag file, the current DAG model has enough info.
> > > >>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> If we go down this path, we indeed can get
rid of the dagbag
> > > >>>>> dependency
> > > >>>>>>> easily, but we will have to 1. increase the DB
load( not too
> > > >>>> concerning
> > > >>>>>> at
> > > >>>>>>> the moment ), 2. wait the DAG file to be parsed
before getting
> > > >> the
> > > >>>> page
> > > >>>>>>> back, potentially multiple times. DAG file can
sometimes take
> > > >>> quite a
> > > >>>>>> while
> > > >>>>>>> to parse, e.g. we have some framework DAG files
generating
> large
> > > >>>> number
> > > >>>>>> of
> > > >>>>>>> DAGs from some static config files or even jupyter
notebooks
> and
> > > >>> they
> > > >>>>> can
> > > >>>>>>> take 30+ seconds to parse. Yes we don't like large
DAG files
> but
> > > >>>> people
> > > >>>>>> do
> > > >>>>>>> see the beauty of code as config and sometimes
heavily
> > > >>> abuseleverage
> > > >>>>> it.
> > > >>>>>>> Assuming all users have the same nice small python
file that
> can
> > > >> be
> > > >>>>>> parsed
> > > >>>>>>> fast, I'm still a bit worried about this approach.
Continuing
> on
> > > >>> this
> > > >>>>>> path
> > > >>>>>>> means we've chosen DagModel to be the serialized
representation
> > > >> of
> > > >>>> DAG
> > > >>>>>> and
> > > >>>>>>> DB columns to hold different properties--it can
be one
> candidate
> > > >>> but
> > > >>>> I
> > > >>>>>>> don't know if we should settle on that now. I
would personally
> > > >>>> prefer a
> > > >>>>>>> more compact, e.g. JSON5, and easy to scale representation(
> such
> > > >>> that
> > > >>>>>>> serializing new fields != DB upgrade).
> > > >>>>>>>
> > > >>>>>>> Do you mean https://json5.org/ or is this a typo?
That might
> be
> > > >>> okay
> > > >>>>> for
> > > >>>>>>> a nicer user front end, but the "canonical" version
stored in
> the
> > > >>> DB
> > > >>>>>> should
> > > >>>>>>> be something "plainer" like just JSON.
> > > >>>>>>>
> > > >>>>>>> I'm not sure that "serializing new fields != DB
upgrade" is
> that
> > > >>> big
> > > >>>>> of a
> > > >>>>>>> concern, as we don't add fields that often. One
possible way of
> > > >>>> dealing
> > > >>>>>>> with it if we do is to have a hybrid approach
- a few distinct
> > > >>>> columns,
> > > >>>>>> but
> > > >>>>>>> then a JSON blob. (and if we were only to support
postgres we
> > > >> could
> > > >>>>> just
> > > >>>>>>> use JSONb. But I think our friends at Google may
object ;) )
> > > >>>>>>>
> > > >>>>>>> Adding a new column in a DB migration with a default
NULL
> > > >> shouldn't
> > > >>>> be
> > > >>>>> an
> > > >>>>>>> expensive operation, or difficult to achieve.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> In my imagination we would have to collect
the list of dynamic
> > > >>>>> features
> > > >>>>>>> depending on unserializable fields of a DAG and
start a
> > > >>>> discussion/vote
> > > >>>>>> on
> > > >>>>>>> dropping support of them( I'm working on this
but if anyone has
> > > >>>> already
> > > >>>>>>> done so please take over), decide on the serialized
> > > >> representation
> > > >>>> of a
> > > >>>>>> DAG
> > > >>>>>>> and then replace dagbag with it in webserver.
Per previous
> > > >>> discussion
> > > >>>>> and
> > > >>>>>>> some offline discussions with Dan, one future
of DAG
> > > >> serialization
> > > >>>>> that I
> > > >>>>>>> like would look similar to this:
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>>> https://imgur.com/ncqqQgc
> > > >>>>>>>
> > > >>>>>>> Something I've thought about before for other
things was to
> embed
> > > >>> an
> > > >>>>> API
> > > >>>>>>> server _into_ the scheduler - this would be useful
for k8s
> > > >>>>> healthchecks,
> > > >>>>>>> native Prometheus metrics without needed statsd
bridge, and
> could
> > > >>>> have
> > > >>>>>>> endpoints to get information such as this directly.
> > > >>>>>>>
> > > >>>>>>> I was thinking it would be _in_ the scheduler
process using
> > > >> either
> > > >>>>>> threads
> > > >>>>>>> (ick. Python's still got a GIL doesn't it?) or
using
> > > >> async/twisted
> > > >>>> etc.
> > > >>>>>>> (not a side-car process like we have with the
logs webserver
> for
> > > >>>>> `airflow
> > > >>>>>>> worker`).
> > > >>>>>>>
> > > >>>>>>> (This is possibly an unrelated discussion, but
might be worth
> > > >>> talking
> > > >>>>>>> about?)
> > > >>>>>>>
> > > >>>>>>>> We can still discuss/vote which approach we
want to take but I
> > > >>>> don't
> > > >>>>>>> want the door to above design to be shut right
now or we have
> to
> > > >>>> spend
> > > >>>>> a
> > > >>>>>>> lot effort switch path later.
> > > >>>>>>>>
> > > >>>>>>>> Bas and Peter, I'm very sorry to extend the
discussion but I
> do
> > > >>>> think
> > > >>>>>>> this is tightly related to the AIP and PRs behind
it. And my
> > > >>> sincere
> > > >>>>>>> apology for bringing this up so late( I only pull
the open PR
> > > >> list
> > > >>>>>>> occasionally, if there's a way to subscribe to
new PR event I'd
> > > >>> love
> > > >>>> to
> > > >>>>>>> know how).
> > > >>>>>>>
> > > >>>>>>> It's noisy, but you can subscribe to
> commits@airflow.apache.org
> > > >>> (but
> > > >>>>> be
> > > >>>>>>> warned, this also includes all Jira tickets, edits
of every
> > > >> comment
> > > >>>> on
> > > >>>>>>> github etc.).
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Cheers,
> > > >>>>>>>> Kevin Y
> > > >>>>>>>>
> > > >>>>>>>> On Thu, Feb 28, 2019 at 1:36 PM Peter van
t Hof <
> > > >>>>> pjrvanthof@gmail.com
> > > >>>>>>> <mailto:pjrvanthof@gmail.com>> wrote:
> > > >>>>>>>> Hi all,
> > > >>>>>>>>
> > > >>>>>>>> Just some comments one the point Bolke dit
give in relation of
> > > >> my
> > > >>>> PR.
> > > >>>>>>>>
> > > >>>>>>>> At first, the main focus is: making the webserver
stateless.
> > > >>>>>>>>
> > > >>>>>>>>> 1) Make the webserver stateless: needs
the graph of the
> > > >>> *current*
> > > >>>>> dag
> > > >>>>>>>>
> > > >>>>>>>> This is the main goal but for this a lot more
PR’s will be
> > > >> coming
> > > >>>>> once
> > > >>>>>>> my current is merged. For edges and graph view
this is covered
> in
> > > >>> my
> > > >>>> PR
> > > >>>>>>> already.
> > > >>>>>>>>
> > > >>>>>>>>> 2) Version dags: for consistency mainly
and not requiring
> > > >>> parsing
> > > >>>>> of
> > > >>>>>>> the
> > > >>>>>>>>> dag on every loop
> > > >>>>>>>>
> > > >>>>>>>> In my PR the historical graphs will be stored
for each DagRun.
> > > >>> This
> > > >>>>>>> means that you can see if an older DagRun was
the same graph
> > > >>>> structure,
> > > >>>>>>> even if some tasks does not exists anymore in
the current
> graph.
> > > >>>>>> Especially
> > > >>>>>>> for dynamic DAG’s this is very useful.
> > > >>>>>>>>
> > > >>>>>>>>> 3) Make the scheduler not require DAG
files. This could be
> > > >> done
> > > >>>> if
> > > >>>>>> the
> > > >>>>>>>>> edges contain all information when to
trigger the next task.
> > > >> We
> > > >>>> can
> > > >>>>>>> then
> > > >>>>>>>>> have event driven dag parsing outside
of the scheduler loop,
> > > >>> ie.
> > > >>>> by
> > > >>>>>> the
> > > >>>>>>>>> cli. Storage can also be somewhere else
(git, artifactory,
> > > >>>>>> filesystem,
> > > >>>>>>>>> whatever).
> > > >>>>>>>>
> > > >>>>>>>> The scheduler is almost untouched in this
PR. The only thing
> > > >> that
> > > >>>> is
> > > >>>>>>> added is that this edges are saved to the database
but the
> > > >>> scheduling
> > > >>>>>>> itself din’t change. The scheduler depends now
still on the DAG
> > > >>>> object.
> > > >>>>>>>>
> > > >>>>>>>>> 4) Fully serialise the dag so it becomes
transferable to
> > > >>> workers
> > > >>>>>>>>
> > > >>>>>>>> It nice to see that people has a lot of idea’s
about this. But
> > > >> as
> > > >>>>> Fokko
> > > >>>>>>> already mentioned this is out of scope for the
issue what we
> are
> > > >>>> trying
> > > >>>>>> to
> > > >>>>>>> solve. I also have some idea’s about this but
I like to limit
> > > >> this
> > > >>>>> PR/AIP
> > > >>>>>>> to the webserver.
> > > >>>>>>>>
> > > >>>>>>>> For now my PR does solve 1 and 2 and the rest
of the behaviour
> > > >>>> (like
> > > >>>>>>> scheduling) is untouched.
> > > >>>>>>>>
> > > >>>>>>>> Gr,
> > > >>>>>>>> Peter
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
> > --
> >
> > Jarek Potiuk
> > Polidea <https://www.polidea.com/> | Principal Software Engineer
> >
> > M: +48 660 796 129 <+48660796129>
> > [image: Polidea] <https://www.polidea.com/>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message