airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ashika Umanga Umagiliya <umanga....@gmail.com>
Subject Re: Split DAG code into several files.
Date Wed, 28 Jun 2017 02:01:42 GMT
My current DAGs works fine. Current problem is that , since  we have almost
the same logic for DEV,STG,PROD environment (only with minor parameter
changes),  having 3 different source files causes code redundancy.

I am trying to refactor the current "common DAG logic" into one file and
separate only the parameters which change depend on the environment

On Tue, Jun 27, 2017 at 4:18 PM, Gerard Toonstra <gtoonstra@gmail.com>
wrote:

> you should probably use:
>
> airflow test tutorial print_date 2015-06-01
>
> syntax, as described here:
> https://airflow.incubator.apache.org/tutorial.html
>
> When you execute "python <dagname>.py, it picks up code from the __main__
> section and
> that's likely different.  I'm not sure though.
>
>
> On Tue, Jun 27, 2017 at 8:38 AM, Ashika Umanga Umagiliya <
> umanga.pdn@gmail.com> wrote:
>
> > Thanks for the reply.
> >
> > When I execute "python cdna_daily_dev.py list_tasks" it gives correct
> > output as expected.
> > Does this mean that my DAG is valid ?
> >
> >
> > bisuser@ins-server dags]$ python cdna_daily_dev.py list_tasks
> > [2017-06-27 15:34:29,785] {__init__.py:36} INFO - Using executor
> > CeleryExecutor
> > [2017-06-27 15:34:30,459] {base_hook.py:53} INFO - Using connection to:
> > ins-server.local
> >
> > d_daily_job_finished
> > d_daily_job_start
> > ...
> > pv_proc_ichiba_device_preference_rat
> >
> >
> >
> >
> >
> > Also I added some airflow imports in "cdna_daily_dev.py" so that the file
> > is detected as a DAG. Still the DAGs are not displated in the UI.
> >
> >
> >
> >
> > ====================
> > cdna_daily_common.py
> > ====================
> > global airflow_host_port
> > global env
> > global alert_email
> > global spdb_sync_prefix
> > global post_validate_prefix
> > global schedule_interval
> > global dag_name#PROD cdna_daily_prd
> > global job_execution_prefix
> > from airflow import DAG
> > from datetime import datetime, timedelta
> > from airflow.contrib.hooks import SSHHook
> > from airflow.operators import EmailOperator
> > from airflow.operators.bash_operator import BashOperator
> > from airflow.contrib.operators import SSHExecuteOperator
> > from airflow.operators.python_operator import PythonOperator
> >
> > global dag
> >
> > def initDAG():
> >     # define SSHHook
> >     sshHook = SSHHook()
> >     global dag
> >     dag = DAG(
> >         dag_id=dag_name,
> >         default_args=default_args,
> >         schedule_interval="0 2 * * *"
> >     )
> >     ..create rest of the DAG
> >
> >
> > def execDAG():
> > globa dag
> > initDAG()
> > dag.cli()
> >
> >
> >
> >
> > ==================
> > cdna_daily_dev.py
> > ==================
> > from airflow import DAG
> > import cdna_daily_common
> > cdna_daily_common.airflow_host_port='hostnme.jp.local:9553'
> > cdna_daily_common.env='dev'
> > cdna_daily_common.alert_email='dev-dsd-mon@mail.com'
> > cdna_daily_common.spdb_sync_prefix='echo SPDBSync'
> > cdna_daily_common.post_validate_prefix='echo PostVal'
> > cdna_daily_common.schedule_interval='0 2 * * *'
> > cdna_daily_common.dag_name='cdna_daily_dev_v2'
> > cdna_daily_common.job_execution_prefix="python
> > /home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j"
> >
> > if __name__ == "__main__":
> >     cdna_daily_common.execDAG();
> >
> >
> > On Tue, Jun 27, 2017 at 3:18 PM, Gerard Toonstra <gtoonstra@gmail.com>
> > wrote:
> >
> > > For airflow to find dags, a .py file is read. The file should contain
> > > either "DAG" or "airflow" somewhere to be considered a potential dag
> > file.
> > > Then there are some additional rules whether this actually gets
> scheduled
> > > or not. The log files for the dag file processors is not the same
> > > as the main scheduler, so you could look for those explicitly and see
> if
> > > there are reasons they get rejected.
> > >
> > >
> > > You can find this code in "dag_processing.list_by_file_path()", which
> > > detects potential dags from files it finds.
> > > This creates a list of files that should be considered, which
> potentially
> > > have a dag for scheduling.
> > >
> > >
> > > Then there's models.DagBag.process_file, which is called later, which
> > > actually attempts to parse dags and create Dag classes from those.
> > >
> > >
> > > From your code, I think you've already noticed this page on the
> subject:
> > > https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls
> > >
> > >
> > > Hope that helps!
> > >
> > >
> > > Rgds,
> > >
> > > Gerard
> > >
> > >
> > >
> > >
> > > On Tue, Jun 27, 2017 at 7:09 AM, Ashika Umanga Umagiliya <
> > > umanga.pdn@gmail.com> wrote:
> > >
> > > > Greetings ,
> > > >
> > > > We are trying to extract common DAG creation code into a single
> source
> > > file
> > > > and create seperate file for differente environment  imporing this
> > > "common
> > > > logic".
> > > >
> > > > "cdna_daily_common.py" has the common DAG creation code. File for
> > > different
> > > > environments are like "cdna_daily_dev.py" ,"cdna_daily_stg.py" ..etc
> > > >
> > > > However when I copy these files in the "dags" folder , they doesnt
> show
> > > up
> > > > in Airflow UI. How can I make these DAGs visible on Airflow UI
> > following
> > > > this coding convention.
> > > >
> > > >
> > > > ====================
> > > > cdna_daily_common.py
> > > > ====================
> > > > global airflow_host_port
> > > > global env
> > > > global alert_email
> > > > global spdb_sync_prefix
> > > > global post_validate_prefix
> > > > global schedule_interval
> > > > global dag_name#PROD cdna_daily_prd
> > > > global job_execution_prefix
> > > >
> > > > global dag
> > > >
> > > > def initDAG():
> > > >     # define SSHHook
> > > >     sshHook = SSHHook()
> > > >     global dag
> > > >     dag = DAG(
> > > >         dag_id=dag_name,
> > > >         default_args=default_args,
> > > >         schedule_interval="0 2 * * *"
> > > >     )
> > > >     ..create rest of the DAG
> > > >
> > > >
> > > > def execDAG():
> > > > initDAG()
> > > > dag.cli()
> > > >
> > > >
> > > >
> > > >
> > > > ==================
> > > > cdna_daily_dev.py
> > > > ==================
> > > > import cdna_daily_common
> > > > cdna_daily_common.airflow_host_port='hostnme.jp.local:9553'
> > > > cdna_daily_common.env='dev'
> > > > cdna_daily_common.alert_email='dev-dsd-mon@mail.com'
> > > > cdna_daily_common.spdb_sync_prefix='echo SPDBSync'
> > > > cdna_daily_common.post_validate_prefix='echo PostVal'
> > > > cdna_daily_common.schedule_interval='0 2 * * *'
> > > > cdna_daily_common.dag_name='cdna_daily_dev_v2'
> > > > cdna_daily_common.job_execution_prefix="python
> > > > /home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j"
> > > >
> > > > if __name__ == "__main__":
> > > >     cdna_daily_common.execDAG();
> > > >
> > >
> >
> >
> >
> > --
> > Umanga
> > http://jp.linkedin.com/in/umanga
> > http://umanga.ifreepages.com
> >
>



-- 
Umanga
http://jp.linkedin.com/in/umanga
http://umanga.ifreepages.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message