airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Frank Maritato <fmarit...@opentable.com.INVALID>
Subject Re: [External] Re: Dynamic tasks in a dag?
Date Fri, 14 Sep 2018 17:19:54 GMT
Ok, my mistake. I thought that command was querying the server for its information and not
just looking in a directory relative to where it is being run. I have it working now. Thanks
Chris and Sai!


´╗┐On 9/14/18, 9:58 AM, "Chris Palmer" <chris@crpalmer.com> wrote:

    The relative paths might work from where ever you are evoking 'airflow
    list_tasks', but that doesn't mean they work from wherever the webserver is
    parsing the dags from.
    
    Does running 'airflow list_tasks' from some other running directory work?
    
    On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato
    <fmaritato@opentable.com.invalid> wrote:
    
    > Do you mean give the full path to the files? The relative path I'm using
    > definitely works. When I type airflow list_dags, I can see the output from
    > the print statements that the glob is finding my sql files and creating the
    > snowflake operators.
    >
    > airflow list_tasks workflow also lists all the operators I'm creating. I'm
    > just not seeing them in the ui.
    >
    > On 9/14/18, 9:10 AM, "Sai Phanindhra" <phani8996@gmail.com> wrote:
    >
    >     Hi frank,
    >     Can you try giving global paths?
    >
    >     On Fri 14 Sep, 2018, 21:35 Frank Maritato, <fmaritato@opentable.com
    > .invalid>
    >     wrote:
    >
    >     > Hi,
    >     >
    >     > I'm using apache airflow 1.10.0 and I'm trying to dynamically
    > generate
    >     > some tasks in my dag based on files that are in the dags directory.
    > The
    >     > problem is, I don't see these tasks in the ui, I just see the
    > 'start' dummy
    >     > operator. If I type 'airflow list_tasks workflow', they are listed.
    >     > Thoughts?
    >     >
    >     > Here is how I'm generating the tasks:
    >     >
    >     >
    >     > def create_snowflake_operator(file, dag, snowflake_connection):
    >     >     file_repl = file.replace('/', '_')
    >     >     file_repl = file_repl.replace('.sql', '')
    >     >     print("TASK_ID {}".format(file_repl))
    >     >     return SnowflakeOperator(
    >     >         dag=dag,
    >     >         task_id='create_{}'.format(file_repl),
    >     >         snowflake_conn_id=snowflake_connection,
    >     >         sql=file
    >     >     )
    >     >
    >     > DAG_NAME = 'create_objects'
    >     > dag = DAG(
    >     >     DAG_NAME,
    >     >     default_args=args,
    >     >     dagrun_timeout=timedelta(hours=2),
    >     >     schedule_interval=None,
    >     > )
    >     >
    >     > start = DummyOperator(
    >     >     dag=dag,
    >     >     task_id="start",
    >     > )
    >     >
    >     > print("creating snowflake operators")
    >     >
    >     > for file in glob('dags/snowsql/create/udf/*.sql'):
    >     >     print("FILE {}".format(file))
    >     >     task = create_snowflake_operator(file, dag, 'snowflake_default')
    >     >     task.set_upstream(start)
    >     >
    >     > for file in glob('dags/snowsql/create/table/*.sql'):
    >     >     print("FILE {}".format(file))
    >     >     task = create_snowflake_operator(file, dag, 'snowflake_default')
    >     >     task.set_upstream(start)
    >     >
    >     > for file in glob('dags/snowsql/create/view/*.sql'):
    >     >     print("FILE {}".format(file))
    >     >     task = create_snowflake_operator(file, dag, 'snowflake_default')
    >     >     task.set_upstream(start)
    >     >
    >     > print("done {}".format(start.downstream_task_ids))
    >     >
    >     > Thanks in advance
    >     > --
    >     > Frank Maritato
    >     >
    >
    >
    >
    

Mime
View raw message