airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ashika Umanga Umagiliya <umanga....@gmail.com>
Subject Re: Split DAG code into several files.
Date Tue, 27 Jun 2017 06:38:32 GMT
Thanks for the reply.

When I execute "python cdna_daily_dev.py list_tasks" it gives correct
output as expected.
Does this mean that my DAG is valid ?


bisuser@ins-server dags]$ python cdna_daily_dev.py list_tasks
[2017-06-27 15:34:29,785] {__init__.py:36} INFO - Using executor
CeleryExecutor
[2017-06-27 15:34:30,459] {base_hook.py:53} INFO - Using connection to:
ins-server.local

d_daily_job_finished
d_daily_job_start
...
pv_proc_ichiba_device_preference_rat





Also I added some airflow imports in "cdna_daily_dev.py" so that the file
is detected as a DAG. Still the DAGs are not displated in the UI.




====================
cdna_daily_common.py
====================
global airflow_host_port
global env
global alert_email
global spdb_sync_prefix
global post_validate_prefix
global schedule_interval
global dag_name#PROD cdna_daily_prd
global job_execution_prefix
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks import SSHHook
from airflow.operators import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators import SSHExecuteOperator
from airflow.operators.python_operator import PythonOperator

global dag

def initDAG():
    # define SSHHook
    sshHook = SSHHook()
    global dag
    dag = DAG(
        dag_id=dag_name,
        default_args=default_args,
        schedule_interval="0 2 * * *"
    )
    ..create rest of the DAG


def execDAG():
globa dag
initDAG()
dag.cli()




==================
cdna_daily_dev.py
==================
from airflow import DAG
import cdna_daily_common
cdna_daily_common.airflow_host_port='hostnme.jp.local:9553'
cdna_daily_common.env='dev'
cdna_daily_common.alert_email='dev-dsd-mon@mail.com'
cdna_daily_common.spdb_sync_prefix='echo SPDBSync'
cdna_daily_common.post_validate_prefix='echo PostVal'
cdna_daily_common.schedule_interval='0 2 * * *'
cdna_daily_common.dag_name='cdna_daily_dev_v2'
cdna_daily_common.job_execution_prefix="python
/home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j"

if __name__ == "__main__":
    cdna_daily_common.execDAG();


On Tue, Jun 27, 2017 at 3:18 PM, Gerard Toonstra <gtoonstra@gmail.com>
wrote:

> For airflow to find dags, a .py file is read. The file should contain
> either "DAG" or "airflow" somewhere to be considered a potential dag file.
> Then there are some additional rules whether this actually gets scheduled
> or not. The log files for the dag file processors is not the same
> as the main scheduler, so you could look for those explicitly and see if
> there are reasons they get rejected.
>
>
> You can find this code in "dag_processing.list_by_file_path()", which
> detects potential dags from files it finds.
> This creates a list of files that should be considered, which potentially
> have a dag for scheduling.
>
>
> Then there's models.DagBag.process_file, which is called later, which
> actually attempts to parse dags and create Dag classes from those.
>
>
> From your code, I think you've already noticed this page on the subject:
> https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls
>
>
> Hope that helps!
>
>
> Rgds,
>
> Gerard
>
>
>
>
> On Tue, Jun 27, 2017 at 7:09 AM, Ashika Umanga Umagiliya <
> umanga.pdn@gmail.com> wrote:
>
> > Greetings ,
> >
> > We are trying to extract common DAG creation code into a single source
> file
> > and create seperate file for differente environment  imporing this
> "common
> > logic".
> >
> > "cdna_daily_common.py" has the common DAG creation code. File for
> different
> > environments are like "cdna_daily_dev.py" ,"cdna_daily_stg.py" ..etc
> >
> > However when I copy these files in the "dags" folder , they doesnt show
> up
> > in Airflow UI. How can I make these DAGs visible on Airflow UI following
> > this coding convention.
> >
> >
> > ====================
> > cdna_daily_common.py
> > ====================
> > global airflow_host_port
> > global env
> > global alert_email
> > global spdb_sync_prefix
> > global post_validate_prefix
> > global schedule_interval
> > global dag_name#PROD cdna_daily_prd
> > global job_execution_prefix
> >
> > global dag
> >
> > def initDAG():
> >     # define SSHHook
> >     sshHook = SSHHook()
> >     global dag
> >     dag = DAG(
> >         dag_id=dag_name,
> >         default_args=default_args,
> >         schedule_interval="0 2 * * *"
> >     )
> >     ..create rest of the DAG
> >
> >
> > def execDAG():
> > initDAG()
> > dag.cli()
> >
> >
> >
> >
> > ==================
> > cdna_daily_dev.py
> > ==================
> > import cdna_daily_common
> > cdna_daily_common.airflow_host_port='hostnme.jp.local:9553'
> > cdna_daily_common.env='dev'
> > cdna_daily_common.alert_email='dev-dsd-mon@mail.com'
> > cdna_daily_common.spdb_sync_prefix='echo SPDBSync'
> > cdna_daily_common.post_validate_prefix='echo PostVal'
> > cdna_daily_common.schedule_interval='0 2 * * *'
> > cdna_daily_common.dag_name='cdna_daily_dev_v2'
> > cdna_daily_common.job_execution_prefix="python
> > /home/bisuser/bo_dir/repos/customer_dna/py/job_execution.py -j"
> >
> > if __name__ == "__main__":
> >     cdna_daily_common.execDAG();
> >
>



-- 
Umanga
http://jp.linkedin.com/in/umanga
http://umanga.ifreepages.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message