airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xinbin Huang <bin.huan...@gmail.com>
Subject Re: [AIP-34] Rewrite SubDagOperator
Date Thu, 18 Jun 2020 02:30:02 GMT
Hi Nicholas,

I am not sure about the old behavior of SubDagOperator, maybe it will throw
an error? But in the original proposal, the subdag's schedule_interval will
be ignored. Or if we decide to use TaskGroup to replace SubDag, there will
be no subdag schedule_interval.

Bin

On Wed, Jun 17, 2020 at 6:21 PM 蒋晓峰 <thanosxnicholas@gmail.com> wrote:

> Hi Bin,
>     Thanks for your good proposal. I was confused whether the schedule
> interval of SubDAG is different from that of the parent DAG? I have
> discussed with Jiajie Zhong about the schedule interval of SubDAG. If the
> SubDagOperator has a different schedule interval, what will happen for the
> scheduler to schedule the parent DAG?
>
> Regards,
> Nicholas Jiang
>
> On Thu, Jun 18, 2020 at 8:04 AM Xinbin Huang <bin.huangxb@gmail.com>
> wrote:
>
> > Thank you, Max, Kaxil, and everyone's feedback!
> >
> > I have rethought about the concept of subdag and task groups. I think the
> > better way to approach this is to entirely remove subdag and introduce
> the
> > concept of TaskGroup, which is a container of tasks along with their
> > dependencies *without execution/scheduling logic as a DAG*. The only
> > purpose of it is to group a list of tasks, but you still need to add it
> to
> > a DAG for execution.
> >
> > Here is a small code snippet.
> >
> > ```
> > class TaskGroup:
> >     """
> >     A TaskGroup contains a group of tasks.
> >
> >     If default_args is missing, it will take default args from the DAG.
> >     """
> >     def __init__(self, group_id, default_args):
> >         pass
> >
> >
> > """
> > You can add tasks to a task group similar to adding tasks to a DAG
> >
> > This can be declared in a separate file from the dag file
> > """
> > download_group = TaskGroup(group_id='download',
> default_args=default_args)
> > download_group.add_task(task1)
> > task2.dag = download_group
> >
> > with download_group:
> >     task3 = DummyOperator(task_id='task3')
> >
> >     [task, task2] >> task3
> >
> >
> > """Add it to a DAG for execution"""
> > with DAG(dag_id='start_download_dag', default_args=default_args,
> > schedule_interval='@daily', ...) as dag:
> >     start = DummyOperator(task_id='start')
> >     start >> download_group
> >     # this is equivalent to
> >     # start >> [task, task2] >> task3
> > ```
> >
> > With this, we can still reuse a group of tasks and set dependencies
> between
> > them; it avoids the boilerplate code from using SubDagOperator, and we
> can
> > declare dependencies as `task >> task_group >> task`.
> >
> > User migration wise, we can introduce it before Airflow 2.0 and allow
> > gradual transition. Then we can decide if we still want to keep the
> > SubDagOperator or simply remove it.
> >
> > Any thoughts?
> >
> > Cheers,
> > Bin
> >
> >
> > On Wed, Jun 17, 2020 at 7:37 AM Maxime Beauchemin <
> > maximebeauchemin@gmail.com> wrote:
> >
> > > +1, proposal looks good.
> > >
> > > The original intention was really to have tasks groups and a
> zoom-in/out
> > in
> > > the UI. The original reasoning was to reuse the DAG object since it is
> a
> > > group of tasks, but as highlighted here it does create underlying
> > > confusions since a DAG is much more than just a group of tasks.
> > >
> > > Max
> > >
> > > On Mon, Jun 15, 2020 at 2:43 AM Poornima Joshi <
> > joshipoornima06@gmail.com>
> > > wrote:
> > >
> > > > Thank you for your email.
> > > >
> > > > On Sat, Jun 13, 2020 at 12:18 AM Xinbin Huang <bin.huangxb@gmail.com
> >
> > > > wrote:
> > > >
> > > > > > >   - *Unpack SubDags during dag parsing*: This rewrites
the
> > > > > *DagBag.bag_dag*
> > > > > > >   method to unpack subdag while parsing, and it will give
a
> flat
> > > > > > > structure at
> > > > > > >   the task level
> > > > > >
> > > > > > The serialized_dag representation already does this I think.
At
> > least
> > > > if
> > > > > > I've understood your idea here correctly.
> > > > >
> > > > > I am not sure about serialized_dag representation, but at least it
> > will
> > > > > still keep the subdag entry in the DAG table? In my proposal as
> also
> > in
> > > > the
> > > > > draft PR, the idea is to *extract the tasks from the subdag and add
> > > them
> > > > > back to the root_dag. *So the runtime DAG graph will look exactly
> the
> > > > > same as without subdag but with metadata attached to those
> sections.
> > > > These
> > > > > metadata will be later on used to render in the UI. So after
> parsing
> > (
> > > > > *DagBag.process_file()*), it will just output the *root_dag
> *instead
> > of
> > > > *root_dag +
> > > > > subdag + subdag + nested subdag* etc.
> > > > >
> > > > >    - e.g. section-1-* will have metadata current_group=section-1,
> > > > >    parent_group=<the-root-dag-id> (welcome for naming suggestions),
> > the
> > > > >    reason for parent_group is that we can have nested group and
> still
> > > be
> > > > >    able to capture the dependency.
> > > > >
> > > > > Runtime DAG:
> > > > > [image: image.png]
> > > > >
> > > > > While at the UI, what we see would be something like this by
> > utilizing
> > > > the
> > > > > metadata, and then we can expand or zoom into in some way.
> > > > > [image: image.png]
> > > > >
> > > > > The benefits I can see is that:
> > > > > 1. We don't need to deal with the extra complexity of SubDag for
> > > > execution
> > > > > and scheduling. It will be the same as not using SubDag.
> > > > > 2. Still have the benefits of modularized and reusable dag code and
> > > > > declare dependencies between them. And with the new SubDagOperator
> > (see
> > > > AIP
> > > > > or draft PR), we can use the same dag_factory function for
> > generating 1
> > > > > dag, a lot of dynamic dags, or used for SubDag (in this case, it
> will
> > > > just
> > > > > extract all underlying tasks and append to the root dag).
> > > > >
> > > > >    - Then it gets to the idea of replacing subdag with a simpler
> > > concept
> > > > >    by Ash:  the proposed change basically drains out the contents
> of
> > a
> > > > SubDag
> > > > >    and becomes more like
> ExtractSubdagTasksAndAppendToRootdagOperator
> > > > (forgive
> > > > >    me about the crazy name..). In this case, it is still necessary
> to
> > > > keep the
> > > > >    concept of subdag as it is nothing more than a name?
> > > > >
> > > > > That's why the TaskGroup idea comes up. Thanks Chris Palmer for
> > helping
> > > > > conceptualize the functionality of TaskGroup, I will just paste it
> > > here.
> > > > >
> > > > > >   - Tasks can be added to a TaskGroup
> > > > > >  - You *can* have dependencies between Tasks in the same
> TaskGroup,
> > > but
> > > > > >   *cannot* have dependencies between a Task in a TaskGroup and
> > > either a
> > > > > >   Task in a different TaskGroup or a Task not in any group
> > > > > >   - You *can* have dependencies between a TaskGroup and either
> > other
> > > > > >   TaskGroups or Tasks not in any group
> > > > > >   - The UI will by default render a TaskGroup as a single
> "object",
> > > but
> > > > > >   which you expand or zoom into in some way
> > > > > >   - You'd need some way to determine what the "status" of a
> > TaskGroup
> > > > was
> > > > > >   at least for UI display purposes
> > > > >
> > > > > I agree with Chris:
> > > > > - From the backend's view (scheduler & executor), I think TaskGroup
> > > > should
> > > > > be ignored during execution. (unless we decide to implement some
> > > metadata
> > > > > operations that allows start/stop a group of tasks etc.)
> > > > > - From the UI's View, it should be able to pick up the individual
> > > tasks'
> > > > > status and then determine the TaskGroup's status
> > > > >
> > > > > Bin
> > > > >
> > > > > On Fri, Jun 12, 2020 at 10:28 AM Daniel Imberman <
> > > > > daniel.imberman@gmail.com> wrote:
> > > > >
> > > > >> I hadn’t thought about using the `>>` operator to tie
dags
> together
> > > but
> > > > I
> > > > >> think that sounds pretty great! I wonder if we could essentially
> > write
> > > > in
> > > > >> the ability to set dependencies to all starter-tasks for that
DAG.
> > > > >>
> > > > >> I’m personally ok with SubDag being a mostly UI concept. It
> doesn’t
> > > need
> > > > >> to execute separately, you’re just adding more tasks to the
queue
> > that
> > > > will
> > > > >> be executed when there are resources available.
> > > > >>
> > > > >> via Newton Mail [
> > > > >>
> > > >
> > >
> >
> https://cloudmagic.com/k/d/mailapp?ct=dx&cv=10.0.50&pv=10.14.6&source=email_footer_2
> > > > >> ]
> > > > >> On Fri, Jun 12, 2020 at 9:45 AM, Chris Palmer <chris@crpalmer.com
> >
> > > > wrote:
> > > > >> I agree that SubDAGs are an overly complex abstraction. I think
> what
> > > is
> > > > >> needed/useful is a TaskGroup concept. On a high level I think
you
> > want
> > > > >> this
> > > > >> functionality:
> > > > >>
> > > > >> - Tasks can be added to a TaskGroup
> > > > >> - You *can* have dependencies between Tasks in the same TaskGroup,
> > but
> > > > >> *cannot* have dependencies between a Task in a TaskGroup and
> either
> > a
> > > > >> Task in a different TaskGroup or a Task not in any group
> > > > >> - You *can* have dependencies between a TaskGroup and either
other
> > > > >> TaskGroups or Tasks not in any group
> > > > >> - The UI will by default render a TaskGroup as a single "object",
> > but
> > > > >> which you expand or zoom into in some way
> > > > >> - You'd need some way to determine what the "status" of a
> TaskGroup
> > > was
> > > > >> at least for UI display purposes
> > > > >>
> > > > >> Not sure if it would need to be a top level object with its own
> > > database
> > > > >> table and model or just another attribute on tasks. I think you
> > could
> > > > >> build
> > > > >> it in a way such that from the schedulers point of view a DAG
with
> > > > >> TaskGroups doesn't get treated any differently. So it really
just
> > > > becomes
> > > > >> a
> > > > >> shortcut for setting dependencies between sets of Tasks, and
> allows
> > > the
> > > > UI
> > > > >> to simplify the render of the DAG structure.
> > > > >>
> > > > >> Chris
> > > > >>
> > > > >> On Fri, Jun 12, 2020 at 12:12 PM Dan Davydov
> > > > <ddavydov@twitter.com.invalid
> > > > >> >
> > > > >> wrote:
> > > > >>
> > > > >> > Agree with James (and think it's actually the more important
> issue
> > > to
> > > > >> fix),
> > > > >> > but I am still convinced Ash' idea is the right way forward
> (just
> > it
> > > > >> might
> > > > >> > require a bit more work to deprecate than adding visual
grouping
> > in
> > > > the
> > > > >> > UI).
> > > > >> >
> > > > >> > There was a previous thread about this FYI with more context
on
> > why
> > > > >> subdags
> > > > >> > are bad and potential solutions:
> > > > >> >
> https://www.mail-archive.com/dev@airflow.apache.org/msg01202.html
> > > . A
> > > > >> > solution I outline there to Jame's problem is e.g. enabling
the
> >>
> > > > >> operator
> > > > >> > for Airflow operators to work with DAGs as well. I see this
> being
> > > > >> separate
> > > > >> > from Ash' solution for DAG grouping in the UI but one of
the two
> > > items
> > > > >> > required to replace all existing subdag functionality.
> > > > >> >
> > > > >> > I've been working with subdags for 3 years and they are
always a
> > > giant
> > > > >> pain
> > > > >> > to use. They are a constant source of user confusion and
> breakages
> > > > >> during
> > > > >> > upgrades. Would love to see them gone :).
> > > > >> >
> > > > >> > On Fri, Jun 12, 2020 at 11:11 AM James Coder <
> jcoder01@gmail.com>
> > > > >> wrote:
> > > > >> >
> > > > >> > > I'm not sure I totally agree it's just a UI concept.
I use the
> > > > subdag
> > > > >> > > operator to simplify dependencies too. If you have
a group of
> > > tasks
> > > > >> that
> > > > >> > > need to finish before another group of tasks start,
using a
> > subdag
> > > > is
> > > > >> a
> > > > >> > > pretty quick way to set those dependencies and I think
also
> make
> > > it
> > > > >> > easier
> > > > >> > > to follow the dag code.
> > > > >> > >
> > > > >> > > On Fri, Jun 12, 2020 at 9:53 AM Kyle Hamlin <
> > hamlin.kn@gmail.com>
> > > > >> wrote:
> > > > >> > >
> > > > >> > > > I second Ash’s grouping concept.
> > > > >> > > >
> > > > >> > > > On Fri, Jun 12, 2020 at 5:10 AM Ash Berlin-Taylor
<
> > > ash@apache.org
> > > > >
> > > > >> > > wrote:
> > > > >> > > >
> > > > >> > > > > Question:
> > > > >> > > > >
> > > > >> > > > > Do we even need the SubDagOperator anymore?
> > > > >> > > > >
> > > > >> > > > > Would removing it entirely and just replacing
it with a UI
> > > > >> grouping
> > > > >> > > > > concept be conceptually simpler, less to
get wrong, and
> > closer
> > > > to
> > > > >> > what
> > > > >> > > > > users actually want to achieve with subdags?
> > > > >> > > > >
> > > > >> > > > > With your proposed change, tasks in subdags
could start
> > > running
> > > > in
> > > > >> > > > > parallel (a good change) -- so should we
not also just
> > > > _enitrely_
> > > > >> > > remove
> > > > >> > > > > the concept of a sub dag and replace it with
something
> > > simpler.
> > > > >> > > > >
> > > > >> > > > > Problems with subdags (I think. I haven't
used them
> > > extensively
> > > > so
> > > > >> > may
> > > > >> > > > > be wrong on some of these):
> > > > >> > > > > - They need their own dag_id, but it has(?)
to be of the
> > form
> > > > >> > > > > `parent_dag_id.subdag_id`.
> > > > >> > > > > - They need their own schedule_interval,
but it has to
> match
> > > the
> > > > >> > parent
> > > > >> > > > dag
> > > > >> > > > > - Sub dags can be paused on their own. (Does
it make sense
> > to
> > > do
> > > > >> > this?
> > > > >> > > > > Pausing just a sub dag would mean the sub
dag would never
> > > > >> execute, so
> > > > >> > > > > the SubDagOperator would fail too.
> > > > >> > > > > - You had to choose the executor to operator
a subdag with
> > --
> > > > >> always
> > > > >> > a
> > > > >> > > > > bit of a kludge.
> > > > >> > > > >
> > > > >> > > > > Thoughts?
> > > > >> > > > >
> > > > >> > > > > -ash
> > > > >> > > > >
> > > > >> > > > > On Jun 12 2020, at 12:01 pm, Ash Berlin-Taylor
<
> > > ash@apache.org>
> > > > >> > wrote:
> > > > >> > > > >
> > > > >> > > > > > Workon sub-dags is much needed, I'm
excited to see how
> > this
> > > > >> > > progresses.
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > >> - *Unpack SubDags during dag parsing*:
This rewrites
> the
> > > > >> > > > > *DagBag.bag_dag*
> > > > >> > > > > >> method to unpack subdag while parsing,
and it will
> give a
> > > > flat
> > > > >> > > > > >> structure at
> > > > >> > > > > >> the task level
> > > > >> > > > > >
> > > > >> > > > > > The serialized_dag representation already
does this I
> > think.
> > > > At
> > > > >> > least
> > > > >> > > > if
> > > > >> > > > > > I've understood your idea here correctly.
> > > > >> > > > > >
> > > > >> > > > > > -ash
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > > On Jun 12 2020, at 9:51 am, Xinbin Huang
<
> > > > bin.huangxb@gmail.com
> > > > >> >
> > > > >> > > > wrote:
> > > > >> > > > > >
> > > > >> > > > > >> Hi everyone,
> > > > >> > > > > >>
> > > > >> > > > > >> Sending a message to everyone and
collect feedback on
> the
> > > > >> AIP-34
> > > > >> > on
> > > > >> > > > > >> rewriting SubDagOperator. This was
previously briefly
> > > > >> mentioned in
> > > > >> > > the
> > > > >> > > > > >> discussion about what needs to be
done for Airflow 2.0,
> > and
> > > > >> one of
> > > > >> > > the
> > > > >> > > > > >> ideas is to make SubDagOperator
attach tasks back to
> the
> > > root
> > > > >> DAG.
> > > > >> > > > > >>
> > > > >> > > > > >> This AIP-34 focuses on solving SubDagOperator
related
> > > issues
> > > > by
> > > > >> > > > > reattaching
> > > > >> > > > > >> all tasks back to the root dag while
respecting
> > > dependencies
> > > > >> > during
> > > > >> > > > > >> parsing. The original grouping effect
on the UI will be
> > > > >> achieved
> > > > >> > > > through
> > > > >> > > > > >> grouping related tasks by metadata.
> > > > >> > > > > >>
> > > > >> > > > > >> This also makes the dag_factory
function more reusable
> > > > because
> > > > >> you
> > > > >> > > > don't
> > > > >> > > > > >> need to have parent_dag_name and
child_dag_name in the
> > > > function
> > > > >> > > > > signature
> > > > >> > > > > >> anymore.
> > > > >> > > > > >>
> > > > >> > > > > >> Changes proposed:
> > > > >> > > > > >>
> > > > >> > > > > >> - *Unpack SubDags during dag parsing*:
This rewrites
> the
> > > > >> > > > > *DagBag.bag_dag*
> > > > >> > > > > >> method to unpack subdag while parsing,
and it will
> give a
> > > > flat
> > > > >> > > > > >> structure at
> > > > >> > > > > >> the task level
> > > > >> > > > > >> - *Simplify SubDagOperator*: The
new SubDagOperator
> acts
> > > > like a
> > > > >> > > > > >> container and most of the original
methods are removed.
> > The
> > > > >> > > > > >> signature is
> > > > >> > > > > >> also changed to *subdag_factory
*with *subdag_args *and
> > > > >> > > > > *subdag_kwargs*.
> > > > >> > > > > >> This is similar to the PythonOperator
signature.
> > > > >> > > > > >> - *Add a TaskGroup model and add
current_group &
> > > parent_group
> > > > >> > > > > attributes
> > > > >> > > > > >> to BaseOperator*: This metadata
is used to group tasks
> > for
> > > > >> > > > > >> rendering at
> > > > >> > > > > >> UI level. It may potentially extend
further to group
> > > > arbitrary
> > > > >> > > tasks
> > > > >> > > > > >> outside the context of subdag to
allow group-level
> > > operations
> > > > >> > > (i.e.
> > > > >> > > > > >> stop/trigger a group of task within
the dag)
> > > > >> > > > > >> - *Webserver UI for SubDag*: Proposed
UI modification
> to
> > > > allow
> > > > >> > > > > >> (un)collapse a group of tasks for
a flat structure to
> > pair
> > > > with
> > > > >> > > the
> > > > >> > > > > first
> > > > >> > > > > >> change instead of the original hierarchical
structure.
> > > > >> > > > > >>
> > > > >> > > > > >>
> > > > >> > > > > >> Please see related documents and
PRs for details:
> > > > >> > > > > >> AIP:
> > > > >> > > > > >>
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-34+Rewrite+SubDagOperator
> > > > >> > > > > >>
> > > > >> > > > > >> Original Issue:
> > > > https://github.com/apache/airflow/issues/8078
> > > > >> > > > > >> Draft PR: https://github.com/apache/airflow/pull/9243
> > > > >> > > > > >>
> > > > >> > > > > >> Please let me know if there are
any aspects that you
> > > > >> > agree/disagree
> > > > >> > > > > >> with or
> > > > >> > > > > >> need more clarification (especially
the third change
> > > > regarding
> > > > >> > > > > TaskGroup).
> > > > >> > > > > >> Any comments are welcome and I am
looking forward to
> it!
> > > > >> > > > > >>
> > > > >> > > > > >> Cheers
> > > > >> > > > > >> Bin
> > > > >> > > > > >>
> > > > >> > > > >
> > > > >> > > > --
> > > > >> > > > Kyle Hamlin
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >
> > > > >
> > > >
> > > > --
> > > > Thanks & Regards
> > > >  Poornima
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message