airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben Tallman <...@apigee.com>
Subject Re: Dynamically defining tasks in a DAG -- HOW?
Date Thu, 08 Sep 2016 21:04:58 GMT
We have done this a lot, and the one issue is that every time the DAG is
evaluated (even during a run), the SQL will be re-run, and tasks can vary.
In fact, we had a select statement that actually marked items as in process
during select, and THAT was bad.

We have moved to x number of tasks, and each one grabs a line from the DB,
and 0 to n of them can actually get skipped if they don't get a line from
the DB.

To be clear, we would really like the DAG's tasks to be frozen at time of
schedule, but that has not been our experience, and I believe will take a
fairly major re-factor. Furthermore, I believe that the definition of a
Dynamic Acyclic Graph is that it is re-evaluated during runtime and that
the path is non-determinate at runtime.


Thanks,
Ben

*--*
*ben tallman* | *apigee
<http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%2F%2Fwww.apigee.com%2F&si=5141814536306688&pi=999a610c-8298-4095-eefd-dfab06b90c1f>*
 | m: +1.503.680.5709 | o: +1.503.608.7552 | twitter @anonymousmanage
<http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%2F%2Ftwitter.com%2Fanonymousmanage&si=5141814536306688&pi=999a610c-8298-4095-eefd-dfab06b90c1f>
 @apigee
<http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=https%3A%2F%2Ftwitter.com%2Fapigee&si=5141814536306688&pi=999a610c-8298-4095-eefd-dfab06b90c1f>
<http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%2F%2Fadapt.apigee.com%2F&si=5141814536306688&pi=999a610c-8298-4095-eefd-dfab06b90c1f>

On Thu, Sep 8, 2016 at 1:50 PM, J C Lawrence <claw@kanga.nu> wrote:

> I have a few hundred thousand files arriving from an external service
> each day and would like to ETL their contents into my store with
> Airflow.  As the files are large and numerous and slow to process, I'd
> also like to process them in parallel...so I thought something like
> this:
>
>     def sub_dag (
>         parent_dag_name,
>         child_dag_name,
>         start_date,
>         schedule_interval):
>       dag = DAG(
>         "%s.%s" % (parent_dag_name, child_dag_name),
>         schedule_interval = schedule_interval,
>         start_date = start_date,
>       )
>       fan_out = operators.DummyOperator(
>         task_id = "fan_out",
>         dag = dag,
>       )
>       fan_in = operators.DummyOperator(
>         task_id = "fan_in",
>         dag = dag,
>       )
>       cur = hooks.PostgresHook ("MY_DB").get_cursor ()
>       cur.execute ("""SELECT file_id
>                      FROM some_table
>                      WHERE something;""".format (foo = func(start_date))
>       for rec in cur:
>         fid = rec[0]
>         o = operators.PythonOperator (
>           task_id = "ImportThing__%s" % fid,
>           provide_context = True,
>           python_callable = import_func,
>           params = {"file_id": fid,},
>           dag = dag)
>         o.set_upstream (fan_out)
>         o.set_downstream (fan_in)
>       cur.close ()
>       return dag
>
> The idea being that the number and identity of the tasks in the sub-DAG
> would vary dynamically depending on what day it was running for (ie
> which what rows come back from the query for that day). But...no, this
> doesn't seem to work.
>
> Any recommendations for how to approach this?
>
> -- JCL
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message