airflow-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nicolas Paris <nicolas.pa...@riseup.net>
Subject Re: reuse dags
Date Fri, 16 Aug 2019 19:54:41 GMT

> One way is "airflow as python data
> Alternatively, you can go more "airflow as scheduler".

Indeed I am using it as a scheduler or orchestrator. (mostly spark,
talend sql files, or bash). All this mess need some experience with
airflow not to get thousand of lines of standalone python dags with no
reusable code and unmanageable copy/paste spaghetti code.

> Spend some time with the source code for operators (don't forget contrib
> folder).

good advice, thanks. I will dig this, and hopefully come with a decent
overall design.


On Fri, Aug 16, 2019 at 12:38:37PM -0700, Daniel Standish wrote:
> 
> When you add dag to operator as an argument, you are just saying "associate
> this operator with this dag".
> 
> The "with dag" context manager just makes it so you can optionally omit the dag
> param.   I find it helps with readability to use "with dag".
>  
> 
>     Correct me if I am wrong, this might be a way to define a bunch of
>     operators in some library files, and import them into dag files ?
> 
> 
> This is precisely what you do.
> 
> The operator is the main unit of abstraction.  If you have a repetitive
> process, you can write an operator for it once and use in many dags.
> 
> You can add params to operator to allow for different behavior.  
> 
> You can use templates for more dynamic behavior.
> 
> Hooks are another layer of abstraction.  A hook can be as simple as fetching
> creds.  Or if you have a family of operators that use same connection, you can
> abstract a lot of functionality to hook layer, and different operators can call
> different hook methods in different sequence to control behavior.  Look at some
> of the GCP or AWS hooks.
> 
> Spend some time with the source code for operators (don't forget contrib
> folder).
> 
> There are different ways to use airflow.  One way is "airflow as python data
> dev framework".  With this approach you abstract your python processes into
> operators.   With this approach you can lean on airflow to manage state.
> 
> Alternatively, you can go more "airflow as scheduler".  Here maybe you don't
> make so many custom operators but just schedule processes to run elsewhere e.g.
> on spark or snowflake whatever.  You can use this approach with python jobs
> too, executing with bash operator or python operator or docker -- in this case
> airflow may not know anything about your processes (e.g. their state or
> creds).    
> 
> 
> 
> 
> 
> On Fri, Aug 16, 2019 at 12:13 PM Nicolas Paris <nicolas.paris@riseup.net>
> wrote:
> 
>     > I would say the main reusable objects in airflow are the operator and the
>     hook,
> 
>     About operator, most examples in the documentation use them by
>     specifying their dag as argument. For this reason I thought they cannot
>     be reused over python files. Eg:
>     > run_this = BashOperator( task_id='run_after_loop', bash_command='echo 1',
>     dag=dag,)
> 
> 
>     After digging, I found that note:
>     > Added in Airflow 1.8
>     >
>     > DAGs can be used as context managers to automatically assign new
>     > operators to that DAG.
>     >
>     > with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
>     >     op = DummyOperator('op')
> 
>     Correct me if I am wrong, this might be a way to define a bunch of
>     operators in some library files, and import them into dag files ?
> 
>     Thanks
> 
> 
>     On Wed, Aug 14, 2019 at 04:13:14PM -0700, Daniel Standish wrote:
>     > I would say the main reusable objects in airflow are the operator and the
>     hook,
>     > not the dag.  These are the primary building blocks that you use to
>     construct
>     > your pipelines. 
>     >
>     > That said you may find it useful to look at TriggerDagRunOperator.  You
>     can
>     > have your dag's behavior change based on execution date and dag_run conf,
>     which
>     > are available to all tasks in as a template context variable.
>     >
>     > You may also look into generating your dags dynamically.
>     >
>     >
>     >
>     >
>     > On Wed, Aug 14, 2019 at 2:58 PM Nicolas Paris <nicolas.paris@riseup.net>
>     wrote:
>     >
>     >     Hi Damian
>     >
>     >     > I believe there are some big improvements coming in Airflow 2 for
>     subdags
>     >
>     >     All right, I won't use them until v2 is available.
>     >
>     >
>     >     > In terms of what you are saying there you have dags depend on each,
>     we
>     >     use a DummyOperator to be an End Point for the DAG that will always
>     be the
>     >     same name, and ExternalTaskSensor which will wait for the End Point
>     to be
>     >     successfully finished. Through a warning, you have to be on exactly
>     the
>     >     same schedule for this to work or you need to do some datetime
>     conversion
>     >     logic to provide the correct execution_date.
>     >
>     >     Interesting but quite complex. I am looking for a built-in simple way
>     to
>     >     import dags.
>     >
>     >     Reusing and factorizing code is quite common need. So far airflow
>     does
>     >     not provide yet a solution for such purpose apparently
>     >
>     >     On Mon, Aug 12, 2019 at 01:34:11PM +0000, Shaw, Damian P.  wrote:
>     >     > Hi Nicolas,
>     >     >
>     >     > I don't have a lot of experience with subdags as there's a lot of
>     >     warnings around them, I would strongly recommend reading this before
>     using
>     >     them: https://www.astronomer.io/guides/subdags/
>     >     >
>     >     > I believe there are some big improvements coming in Airflow 2 for
>     subdags
>     >     where they will act more like regular dags and work in the way you
>     expect.
>     >     >
>     >     > In terms of what you are saying there you have dags depend on each,
>     we
>     >     use a DummyOperator to be an End Point for the DAG that will always
>     be the
>     >     same name, and ExternalTaskSensor which will wait for the End Point
>     to be
>     >     successfully finished. Through a warning, you have to be on exactly
>     the
>     >     same schedule for this to work or you need to do some datetime
>     conversion
>     >     logic to provide the correct execution_date.
>     >     >
>     >     > Regards,
>     >     > Damian
>     >     >
>     >     > -----Original Message-----
>     >     > From: Nicolas Paris [mailto:nicolas.paris@riseup.net]
>     >     > Sent: Sunday, August 11, 2019 7:44 PM
>     >     > To: users@airflow.apache.org
>     >     > Subject: reuse dags
>     >     >
>     >     > Hi
>     >     >
>     >     > From the documentation I guess I spotted the way of reusing dags
>     from
>     >     > other python files: this can be done by creating "subdags".
>     >     >
>     >     > I have created several pipelines (dag1.py, dag2.py...) in several
>     python
>     >     files
>     >     > Right now, I d'like to  build a meta-pipeline dag3 = dag1 >>
dag2
>     >     >
>     >     > Do I have to convert dag1 and dag2 to subdag and import them in
>     dag3 ?
>     >     > Or is there a more friendly way
>     >     >
>     >     > Thanks
>     >     > --
>     >     > nicolas
>     >     >
>     >     >
>     >     > ===================================================================
>     ======
>     >     ======
>     >     > Please access the attached hyperlink for an important electronic
>     >     communications disclaimer:
>     >     > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>     >     > ===================================================================
>     ======
>     >     ======
>     >     >
>     >
>     >     --
>     >     nicolas
>     >
> 
>     --
>     nicolas
> 

-- 
nicolas

Mime
View raw message