airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 蒋晓峰 <thanosxnicho...@gmail.com>
Subject Re: [AIP-34] Rewrite SubDagOperator
Date Thu, 18 Jun 2020 01:21:01 GMT
Hi Bin,
    Thanks for your good proposal. I was confused whether the schedule
interval of SubDAG is different from that of the parent DAG? I have
discussed with Jiajie Zhong about the schedule interval of SubDAG. If the
SubDagOperator has a different schedule interval, what will happen for the
scheduler to schedule the parent DAG?

Regards,
Nicholas Jiang

On Thu, Jun 18, 2020 at 8:04 AM Xinbin Huang <bin.huangxb@gmail.com> wrote:

> Thank you, Max, Kaxil, and everyone's feedback!
>
> I have rethought about the concept of subdag and task groups. I think the
> better way to approach this is to entirely remove subdag and introduce the
> concept of TaskGroup, which is a container of tasks along with their
> dependencies *without execution/scheduling logic as a DAG*. The only
> purpose of it is to group a list of tasks, but you still need to add it to
> a DAG for execution.
>
> Here is a small code snippet.
>
> ```
> class TaskGroup:
>     """
>     A TaskGroup contains a group of tasks.
>
>     If default_args is missing, it will take default args from the DAG.
>     """
>     def __init__(self, group_id, default_args):
>         pass
>
>
> """
> You can add tasks to a task group similar to adding tasks to a DAG
>
> This can be declared in a separate file from the dag file
> """
> download_group = TaskGroup(group_id='download', default_args=default_args)
> download_group.add_task(task1)
> task2.dag = download_group
>
> with download_group:
>     task3 = DummyOperator(task_id='task3')
>
>     [task, task2] >> task3
>
>
> """Add it to a DAG for execution"""
> with DAG(dag_id='start_download_dag', default_args=default_args,
> schedule_interval='@daily', ...) as dag:
>     start = DummyOperator(task_id='start')
>     start >> download_group
>     # this is equivalent to
>     # start >> [task, task2] >> task3
> ```
>
> With this, we can still reuse a group of tasks and set dependencies between
> them; it avoids the boilerplate code from using SubDagOperator, and we can
> declare dependencies as `task >> task_group >> task`.
>
> User migration wise, we can introduce it before Airflow 2.0 and allow
> gradual transition. Then we can decide if we still want to keep the
> SubDagOperator or simply remove it.
>
> Any thoughts?
>
> Cheers,
> Bin
>
>
> On Wed, Jun 17, 2020 at 7:37 AM Maxime Beauchemin <
> maximebeauchemin@gmail.com> wrote:
>
> > +1, proposal looks good.
> >
> > The original intention was really to have tasks groups and a zoom-in/out
> in
> > the UI. The original reasoning was to reuse the DAG object since it is a
> > group of tasks, but as highlighted here it does create underlying
> > confusions since a DAG is much more than just a group of tasks.
> >
> > Max
> >
> > On Mon, Jun 15, 2020 at 2:43 AM Poornima Joshi <
> joshipoornima06@gmail.com>
> > wrote:
> >
> > > Thank you for your email.
> > >
> > > On Sat, Jun 13, 2020 at 12:18 AM Xinbin Huang <bin.huangxb@gmail.com>
> > > wrote:
> > >
> > > > > >   - *Unpack SubDags during dag parsing*: This rewrites the
> > > > *DagBag.bag_dag*
> > > > > >   method to unpack subdag while parsing, and it will give a
flat
> > > > > > structure at
> > > > > >   the task level
> > > > >
> > > > > The serialized_dag representation already does this I think. At
> least
> > > if
> > > > > I've understood your idea here correctly.
> > > >
> > > > I am not sure about serialized_dag representation, but at least it
> will
> > > > still keep the subdag entry in the DAG table? In my proposal as also
> in
> > > the
> > > > draft PR, the idea is to *extract the tasks from the subdag and add
> > them
> > > > back to the root_dag. *So the runtime DAG graph will look exactly the
> > > > same as without subdag but with metadata attached to those sections.
> > > These
> > > > metadata will be later on used to render in the UI. So after parsing
> (
> > > > *DagBag.process_file()*), it will just output the *root_dag *instead
> of
> > > *root_dag +
> > > > subdag + subdag + nested subdag* etc.
> > > >
> > > >    - e.g. section-1-* will have metadata current_group=section-1,
> > > >    parent_group=<the-root-dag-id> (welcome for naming suggestions),
> the
> > > >    reason for parent_group is that we can have nested group and still
> > be
> > > >    able to capture the dependency.
> > > >
> > > > Runtime DAG:
> > > > [image: image.png]
> > > >
> > > > While at the UI, what we see would be something like this by
> utilizing
> > > the
> > > > metadata, and then we can expand or zoom into in some way.
> > > > [image: image.png]
> > > >
> > > > The benefits I can see is that:
> > > > 1. We don't need to deal with the extra complexity of SubDag for
> > > execution
> > > > and scheduling. It will be the same as not using SubDag.
> > > > 2. Still have the benefits of modularized and reusable dag code and
> > > > declare dependencies between them. And with the new SubDagOperator
> (see
> > > AIP
> > > > or draft PR), we can use the same dag_factory function for
> generating 1
> > > > dag, a lot of dynamic dags, or used for SubDag (in this case, it will
> > > just
> > > > extract all underlying tasks and append to the root dag).
> > > >
> > > >    - Then it gets to the idea of replacing subdag with a simpler
> > concept
> > > >    by Ash:  the proposed change basically drains out the contents of
> a
> > > SubDag
> > > >    and becomes more like ExtractSubdagTasksAndAppendToRootdagOperator
> > > (forgive
> > > >    me about the crazy name..). In this case, it is still necessary to
> > > keep the
> > > >    concept of subdag as it is nothing more than a name?
> > > >
> > > > That's why the TaskGroup idea comes up. Thanks Chris Palmer for
> helping
> > > > conceptualize the functionality of TaskGroup, I will just paste it
> > here.
> > > >
> > > > >   - Tasks can be added to a TaskGroup
> > > > >  - You *can* have dependencies between Tasks in the same TaskGroup,
> > but
> > > > >   *cannot* have dependencies between a Task in a TaskGroup and
> > either a
> > > > >   Task in a different TaskGroup or a Task not in any group
> > > > >   - You *can* have dependencies between a TaskGroup and either
> other
> > > > >   TaskGroups or Tasks not in any group
> > > > >   - The UI will by default render a TaskGroup as a single "object",
> > but
> > > > >   which you expand or zoom into in some way
> > > > >   - You'd need some way to determine what the "status" of a
> TaskGroup
> > > was
> > > > >   at least for UI display purposes
> > > >
> > > > I agree with Chris:
> > > > - From the backend's view (scheduler & executor), I think TaskGroup
> > > should
> > > > be ignored during execution. (unless we decide to implement some
> > metadata
> > > > operations that allows start/stop a group of tasks etc.)
> > > > - From the UI's View, it should be able to pick up the individual
> > tasks'
> > > > status and then determine the TaskGroup's status
> > > >
> > > > Bin
> > > >
> > > > On Fri, Jun 12, 2020 at 10:28 AM Daniel Imberman <
> > > > daniel.imberman@gmail.com> wrote:
> > > >
> > > >> I hadn’t thought about using the `>>` operator to tie dags
together
> > but
> > > I
> > > >> think that sounds pretty great! I wonder if we could essentially
> write
> > > in
> > > >> the ability to set dependencies to all starter-tasks for that DAG.
> > > >>
> > > >> I’m personally ok with SubDag being a mostly UI concept. It doesn’t
> > need
> > > >> to execute separately, you’re just adding more tasks to the queue
> that
> > > will
> > > >> be executed when there are resources available.
> > > >>
> > > >> via Newton Mail [
> > > >>
> > >
> >
> https://cloudmagic.com/k/d/mailapp?ct=dx&cv=10.0.50&pv=10.14.6&source=email_footer_2
> > > >> ]
> > > >> On Fri, Jun 12, 2020 at 9:45 AM, Chris Palmer <chris@crpalmer.com>
> > > wrote:
> > > >> I agree that SubDAGs are an overly complex abstraction. I think what
> > is
> > > >> needed/useful is a TaskGroup concept. On a high level I think you
> want
> > > >> this
> > > >> functionality:
> > > >>
> > > >> - Tasks can be added to a TaskGroup
> > > >> - You *can* have dependencies between Tasks in the same TaskGroup,
> but
> > > >> *cannot* have dependencies between a Task in a TaskGroup and either
> a
> > > >> Task in a different TaskGroup or a Task not in any group
> > > >> - You *can* have dependencies between a TaskGroup and either other
> > > >> TaskGroups or Tasks not in any group
> > > >> - The UI will by default render a TaskGroup as a single "object",
> but
> > > >> which you expand or zoom into in some way
> > > >> - You'd need some way to determine what the "status" of a TaskGroup
> > was
> > > >> at least for UI display purposes
> > > >>
> > > >> Not sure if it would need to be a top level object with its own
> > database
> > > >> table and model or just another attribute on tasks. I think you
> could
> > > >> build
> > > >> it in a way such that from the schedulers point of view a DAG with
> > > >> TaskGroups doesn't get treated any differently. So it really just
> > > becomes
> > > >> a
> > > >> shortcut for setting dependencies between sets of Tasks, and allows
> > the
> > > UI
> > > >> to simplify the render of the DAG structure.
> > > >>
> > > >> Chris
> > > >>
> > > >> On Fri, Jun 12, 2020 at 12:12 PM Dan Davydov
> > > <ddavydov@twitter.com.invalid
> > > >> >
> > > >> wrote:
> > > >>
> > > >> > Agree with James (and think it's actually the more important
issue
> > to
> > > >> fix),
> > > >> > but I am still convinced Ash' idea is the right way forward (just
> it
> > > >> might
> > > >> > require a bit more work to deprecate than adding visual grouping
> in
> > > the
> > > >> > UI).
> > > >> >
> > > >> > There was a previous thread about this FYI with more context
on
> why
> > > >> subdags
> > > >> > are bad and potential solutions:
> > > >> > https://www.mail-archive.com/dev@airflow.apache.org/msg01202.html
> > . A
> > > >> > solution I outline there to Jame's problem is e.g. enabling the
>>
> > > >> operator
> > > >> > for Airflow operators to work with DAGs as well. I see this being
> > > >> separate
> > > >> > from Ash' solution for DAG grouping in the UI but one of the
two
> > items
> > > >> > required to replace all existing subdag functionality.
> > > >> >
> > > >> > I've been working with subdags for 3 years and they are always
a
> > giant
> > > >> pain
> > > >> > to use. They are a constant source of user confusion and breakages
> > > >> during
> > > >> > upgrades. Would love to see them gone :).
> > > >> >
> > > >> > On Fri, Jun 12, 2020 at 11:11 AM James Coder <jcoder01@gmail.com>
> > > >> wrote:
> > > >> >
> > > >> > > I'm not sure I totally agree it's just a UI concept. I use
the
> > > subdag
> > > >> > > operator to simplify dependencies too. If you have a group
of
> > tasks
> > > >> that
> > > >> > > need to finish before another group of tasks start, using
a
> subdag
> > > is
> > > >> a
> > > >> > > pretty quick way to set those dependencies and I think also
make
> > it
> > > >> > easier
> > > >> > > to follow the dag code.
> > > >> > >
> > > >> > > On Fri, Jun 12, 2020 at 9:53 AM Kyle Hamlin <
> hamlin.kn@gmail.com>
> > > >> wrote:
> > > >> > >
> > > >> > > > I second Ash’s grouping concept.
> > > >> > > >
> > > >> > > > On Fri, Jun 12, 2020 at 5:10 AM Ash Berlin-Taylor <
> > ash@apache.org
> > > >
> > > >> > > wrote:
> > > >> > > >
> > > >> > > > > Question:
> > > >> > > > >
> > > >> > > > > Do we even need the SubDagOperator anymore?
> > > >> > > > >
> > > >> > > > > Would removing it entirely and just replacing
it with a UI
> > > >> grouping
> > > >> > > > > concept be conceptually simpler, less to get wrong,
and
> closer
> > > to
> > > >> > what
> > > >> > > > > users actually want to achieve with subdags?
> > > >> > > > >
> > > >> > > > > With your proposed change, tasks in subdags could
start
> > running
> > > in
> > > >> > > > > parallel (a good change) -- so should we not also
just
> > > _enitrely_
> > > >> > > remove
> > > >> > > > > the concept of a sub dag and replace it with something
> > simpler.
> > > >> > > > >
> > > >> > > > > Problems with subdags (I think. I haven't used
them
> > extensively
> > > so
> > > >> > may
> > > >> > > > > be wrong on some of these):
> > > >> > > > > - They need their own dag_id, but it has(?) to
be of the
> form
> > > >> > > > > `parent_dag_id.subdag_id`.
> > > >> > > > > - They need their own schedule_interval, but it
has to match
> > the
> > > >> > parent
> > > >> > > > dag
> > > >> > > > > - Sub dags can be paused on their own. (Does it
make sense
> to
> > do
> > > >> > this?
> > > >> > > > > Pausing just a sub dag would mean the sub dag
would never
> > > >> execute, so
> > > >> > > > > the SubDagOperator would fail too.
> > > >> > > > > - You had to choose the executor to operator a
subdag with
> --
> > > >> always
> > > >> > a
> > > >> > > > > bit of a kludge.
> > > >> > > > >
> > > >> > > > > Thoughts?
> > > >> > > > >
> > > >> > > > > -ash
> > > >> > > > >
> > > >> > > > > On Jun 12 2020, at 12:01 pm, Ash Berlin-Taylor
<
> > ash@apache.org>
> > > >> > wrote:
> > > >> > > > >
> > > >> > > > > > Workon sub-dags is much needed, I'm excited
to see how
> this
> > > >> > > progresses.
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >> - *Unpack SubDags during dag parsing*:
This rewrites the
> > > >> > > > > *DagBag.bag_dag*
> > > >> > > > > >> method to unpack subdag while parsing,
and it will give a
> > > flat
> > > >> > > > > >> structure at
> > > >> > > > > >> the task level
> > > >> > > > > >
> > > >> > > > > > The serialized_dag representation already
does this I
> think.
> > > At
> > > >> > least
> > > >> > > > if
> > > >> > > > > > I've understood your idea here correctly.
> > > >> > > > > >
> > > >> > > > > > -ash
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > On Jun 12 2020, at 9:51 am, Xinbin Huang
<
> > > bin.huangxb@gmail.com
> > > >> >
> > > >> > > > wrote:
> > > >> > > > > >
> > > >> > > > > >> Hi everyone,
> > > >> > > > > >>
> > > >> > > > > >> Sending a message to everyone and collect
feedback on the
> > > >> AIP-34
> > > >> > on
> > > >> > > > > >> rewriting SubDagOperator. This was previously
briefly
> > > >> mentioned in
> > > >> > > the
> > > >> > > > > >> discussion about what needs to be done
for Airflow 2.0,
> and
> > > >> one of
> > > >> > > the
> > > >> > > > > >> ideas is to make SubDagOperator attach
tasks back to the
> > root
> > > >> DAG.
> > > >> > > > > >>
> > > >> > > > > >> This AIP-34 focuses on solving SubDagOperator
related
> > issues
> > > by
> > > >> > > > > reattaching
> > > >> > > > > >> all tasks back to the root dag while
respecting
> > dependencies
> > > >> > during
> > > >> > > > > >> parsing. The original grouping effect
on the UI will be
> > > >> achieved
> > > >> > > > through
> > > >> > > > > >> grouping related tasks by metadata.
> > > >> > > > > >>
> > > >> > > > > >> This also makes the dag_factory function
more reusable
> > > because
> > > >> you
> > > >> > > > don't
> > > >> > > > > >> need to have parent_dag_name and child_dag_name
in the
> > > function
> > > >> > > > > signature
> > > >> > > > > >> anymore.
> > > >> > > > > >>
> > > >> > > > > >> Changes proposed:
> > > >> > > > > >>
> > > >> > > > > >> - *Unpack SubDags during dag parsing*:
This rewrites the
> > > >> > > > > *DagBag.bag_dag*
> > > >> > > > > >> method to unpack subdag while parsing,
and it will give a
> > > flat
> > > >> > > > > >> structure at
> > > >> > > > > >> the task level
> > > >> > > > > >> - *Simplify SubDagOperator*: The new
SubDagOperator acts
> > > like a
> > > >> > > > > >> container and most of the original methods
are removed.
> The
> > > >> > > > > >> signature is
> > > >> > > > > >> also changed to *subdag_factory *with
*subdag_args *and
> > > >> > > > > *subdag_kwargs*.
> > > >> > > > > >> This is similar to the PythonOperator
signature.
> > > >> > > > > >> - *Add a TaskGroup model and add current_group
&
> > parent_group
> > > >> > > > > attributes
> > > >> > > > > >> to BaseOperator*: This metadata is used
to group tasks
> for
> > > >> > > > > >> rendering at
> > > >> > > > > >> UI level. It may potentially extend further
to group
> > > arbitrary
> > > >> > > tasks
> > > >> > > > > >> outside the context of subdag to allow
group-level
> > operations
> > > >> > > (i.e.
> > > >> > > > > >> stop/trigger a group of task within the
dag)
> > > >> > > > > >> - *Webserver UI for SubDag*: Proposed
UI modification to
> > > allow
> > > >> > > > > >> (un)collapse a group of tasks for a flat
structure to
> pair
> > > with
> > > >> > > the
> > > >> > > > > first
> > > >> > > > > >> change instead of the original hierarchical
structure.
> > > >> > > > > >>
> > > >> > > > > >>
> > > >> > > > > >> Please see related documents and PRs
for details:
> > > >> > > > > >> AIP:
> > > >> > > > > >>
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-34+Rewrite+SubDagOperator
> > > >> > > > > >>
> > > >> > > > > >> Original Issue:
> > > https://github.com/apache/airflow/issues/8078
> > > >> > > > > >> Draft PR: https://github.com/apache/airflow/pull/9243
> > > >> > > > > >>
> > > >> > > > > >> Please let me know if there are any aspects
that you
> > > >> > agree/disagree
> > > >> > > > > >> with or
> > > >> > > > > >> need more clarification (especially the
third change
> > > regarding
> > > >> > > > > TaskGroup).
> > > >> > > > > >> Any comments are welcome and I am looking
forward to it!
> > > >> > > > > >>
> > > >> > > > > >> Cheers
> > > >> > > > > >> Bin
> > > >> > > > > >>
> > > >> > > > >
> > > >> > > > --
> > > >> > > > Kyle Hamlin
> > > >> > > >
> > > >> > >
> > > >> >
> > > >
> > > >
> > >
> > > --
> > > Thanks & Regards
> > >  Poornima
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message