airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Toonstra <gtoons...@gmail.com>
Subject Re: Split DAG code into several files.
Date Tue, 27 Jun 2017 06:18:30 GMT
For airflow to find dags, a .py file is read. The file should contain
either "DAG" or "airflow" somewhere to be considered a potential dag file.
Then there are some additional rules whether this actually gets scheduled
or not. The log files for the dag file processors is not the same
as the main scheduler, so you could look for those explicitly and see if
there are reasons they get rejected.


You can find this code in "dag_processing.list_by_file_path()", which
detects potential dags from files it finds.
This creates a list of files that should be considered, which potentially
have a dag for scheduling.


Then there's models.DagBag.process_file, which is called later, which
actually attempts to parse dags and create Dag classes from those.


>From your code, I think you've already noticed this page on the subject:
https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls


Hope that helps!


Rgds,

Gerard




On Tue, Jun 27, 2017 at 7:09 AM, Ashika Umanga Umagiliya <
umanga.pdn@gmail.com> wrote:

> Greetings ,
>
> We are trying to extract common DAG creation code into a single source file
> and create seperate file for differente environment  imporing this "common
> logic".
>
> "cdna_daily_common.py" has the common DAG creation code. File for different
> environments are like "cdna_daily_dev.py" ,"cdna_daily_stg.py" ..etc
>
> However when I copy these files in the "dags" folder , they doesnt show up
> in Airflow UI. How can I make these DAGs visible on Airflow UI following
> this coding convention.
>
>
> ====================
> cdna_daily_common.py
> ====================
> global airflow_host_port
> global env
> global alert_email
> global spdb_sync_prefix
> global post_validate_prefix
> global schedule_interval
> global dag_name#PROD cdna_daily_prd
> global job_execution_prefix
>
> global dag
>
> def initDAG():
>     # define SSHHook
>     sshHook = SSHHook()
>     global dag
>     dag = DAG(
>         dag_id=dag_name,
>         default_args=default_args,
>         schedule_interval="0 2 * * *"
>     )
>     ..create rest of the DAG
>
>
> def execDAG():
> initDAG()
> dag.cli()
>
>
>
>
> ==================
> cdna_daily_dev.py
> ==================
> import cdna_daily_common
> cdna_daily_common.airflow_host_port='hostnme.jp.local:9553'
> cdna_daily_common.env='dev'
> cdna_daily_common.alert_email='dev-dsd-mon@mail.com'
> cdna_daily_common.spdb_sync_prefix='echo SPDBSync'
> cdna_daily_common.post_validate_prefix='echo PostVal'
> cdna_daily_common.schedule_interval='0 2 * * *'
> cdna_daily_common.dag_name='cdna_daily_dev_v2'
> cdna_daily_common.job_execution_prefix="python
> /home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j"
>
> if __name__ == "__main__":
>     cdna_daily_common.execDAG();
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message