airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Frank Maritato <fmarit...@opentable.com.INVALID>
Subject Dynamic tasks in a dag?
Date Fri, 14 Sep 2018 16:05:08 GMT
Hi,

I'm using apache airflow 1.10.0 and I'm trying to dynamically generate some tasks in my dag
based on files that are in the dags directory. The problem is, I don't see these tasks in
the ui, I just see the 'start' dummy operator. If I type 'airflow list_tasks workflow', they
are listed. Thoughts?

Here is how I'm generating the tasks:


def create_snowflake_operator(file, dag, snowflake_connection):
    file_repl = file.replace('/', '_')
    file_repl = file_repl.replace('.sql', '')
    print("TASK_ID {}".format(file_repl))
    return SnowflakeOperator(
        dag=dag,
        task_id='create_{}'.format(file_repl),
        snowflake_conn_id=snowflake_connection,
        sql=file
    )

DAG_NAME = 'create_objects'
dag = DAG(
    DAG_NAME,
    default_args=args,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval=None,
)

start = DummyOperator(
    dag=dag,
    task_id="start",
)

print("creating snowflake operators")

for file in glob('dags/snowsql/create/udf/*.sql'):
    print("FILE {}".format(file))
    task = create_snowflake_operator(file, dag, 'snowflake_default')
    task.set_upstream(start)

for file in glob('dags/snowsql/create/table/*.sql'):
    print("FILE {}".format(file))
    task = create_snowflake_operator(file, dag, 'snowflake_default')
    task.set_upstream(start)

for file in glob('dags/snowsql/create/view/*.sql'):
    print("FILE {}".format(file))
    task = create_snowflake_operator(file, dag, 'snowflake_default')
    task.set_upstream(start)

print("done {}".format(start.downstream_task_ids))

Thanks in advance
--
Frank Maritato
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message