airflow-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Saleil Bhat (BLOOMBERG/ 919 3RD A)" <>
Subject Re: Dynamically adding DAGs to Airflow
Date Sat, 09 May 2020 20:30:09 GMT
Ok got it, thanks for the clarification. Your proposed solution should work for us, at least
as a starting point.

- Saleil

From: At: 05/09/20 10:46:57To:  Saleil Bhat (BLOOMBERG/ 919 3RD A
) ,
Subject: Re: Dynamically adding DAGs to Airflow

The python files get parsed repeatedly by the various different components of Airflow, and
have access too all the DAGs objects in the global namespace of each of those files. There
is no real sense of a DAG existing outside of python files.

The dynamic portion of DAG generation, is really just saying that each python file doesn't
have to contain a fixed number of DAGs. The DAGs generated by a file can be based on some
other resource that is changed independently.

For your situation one approach would be to have a configuration directory with one config
file (json, yaml or something) for each requested workflow, containing the specific parameters.
Then you'd have a python file that loops through that directory and creates a DAG per config
file found.

When a new workflow request comes in, all you would need to do is create a new config file
and put it in the configuration directory. Then when the scheduler next parses your python
file it will see the new DAG.

You'll have to remember that all the necessary files will need to be on, or accessible, to
the Airflow scheduler, workers and webserver (unless the stateless webserver work is complete).
So if you are keeping the configs as local files, you'll have to push any new configs to all
of those machines. You could also keep the configs somewhere centralized like AWS S3 or Google
Cloud Storage, but keep in mind that DAG files do get parsed repeatedly and depending on how
many configs you have the overhead of pulling down that many files from the cloud every time
might become burdensome.


On Sat, May 9, 2020, 1:24 PM Saleil Bhat (BLOOMBERG/ 919 3RD A) <>

Thanks for the response! Perhaps it will be easier if I explain my use-case, and you can tell
me if I'm missing an obvious, easier way to do what I'm trying to do. 

We are building an infrastructure-as-a-service platform where users can kick off a workflow
for themselves and in their request, specify the schedule_interval and start_date. The majority
of the workflow is the same for any user request, with only some config parameters and the
schedule differing for each user.

However, my understanding is that the "unit of scheduling" in Airflow is a DAG. This means
in order to leverage Airflow's scheduling functionality, each user's request needs to be represented
by its own DAG, each with the specified schedule_interval and start_date. One way to do this
is to make a DAG template file, populate it with the user request data, and write the resulting
.py file to the DAG_FOLDER. 

I was just wondering if there's a way to do this directly in the running Airflow scheduler
process itself; that is, directly inject a DAG definition into the scheduler without writing
a physical .py file to disk.  Alternatively, if not, is it possible to have multiple schedules
for a single DAG (in which case, we would not need to have a DAG per user request)? 


From: At: 05/08/20 22:28:31To:  Saleil Bhat (BLOOMBERG/ 919 3RD A
) ,
Subject: Re: Dynamically adding DAGs to Airflow

Airflow will continue to periodically look for new dags when running --- whether dynamic or

Does your dag show up when you do airflow list_dags?  Then it will show up in webserver sooner
or later.  If it does not, then it's likely something is wrong with your dag file.

There has been talk of changing airflow's behavior of automatically parsing every dag over
and over.  This could reduce unnecessary processing and make "expensive" dynamic dags feasible,
but I don't think this has been implemented yet. 

On Fri, May 8, 2020 at 3:55 PM Saleil Bhat (BLOOMBERG/ 919 3RD A) <>

Hey all, 

I'm new to Airflow, and I have a question concerning creating DAGs on the fly. 
I saw this snippet in the documentation:
which suggests you can programmatically create DAGs. 

My question is, can I invoke code similar to this to create a new DAG when Airflow is already
running? For example, suppose I have a DAG factory which takes some config parameters and
constructs a DAG. Would it be possible to use the CLI and/or REST API to trigger a call to
this DAG factory to add a new DAG to my Airflow system? 


View raw message