airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adinata <mail.die...@gmail.com>
Subject Re: Dynamically defining tasks in a DAG -- HOW?
Date Fri, 09 Sep 2016 02:24:23 GMT
Hi,

I also have this kind of use case. We need to generate reports for many
users where the selected users comes out from where specific queries.

I did something like this:

[image: Inline image 1]

With the relevant DAG definition:

for i in range(0, int(Variable.get('weekly-mailer-worker', '5'))):
    send_email = ExecuteFannedOutOperator(
        task_id='execute-%s-%s' % (task_id, i),
        dag=dag,
        tolerance=100
    )
    start_weekly_mailer >> send_email
    send_email >> post_execute

start-weekly-mailer will run the queries, and then send task definition to
queue (we used SQS). The execute-weekly-mailer will pull the task
definition from the queue, and will keep running until the queue is empty.
The post-execute mostly for cleanups.

If you see the DAG definition, I uses Variable, so I can increase the
number of worker from webserver (I can't decrease already running task,
because, you know, this is a hack). I think you can also set the Variable
on the start-weekly-mailer, to define how many parallel worker will need to
do, e.g. based on the number of row results.

I hope we can improve airflow to also handle this kind of uses case.


--
*Adinata*
TOKI 2009
SMAN Plus Provinsi Riau 9th
13509022 - Informatika ITB 2009
Engineer - UrbanIndo.com

On Fri, Sep 9, 2016 at 6:12 AM, J C Lawrence <claw@kanga.nu> wrote:

> On Thu, 8 Sep 2016 14:04:58 -0700, Ben Tallman <ben@apigee.com> wrote:
>
> > We have done this a lot, and the one issue is that every time the DAG
> > is evaluated (even during a run), the SQL will be re-run, and tasks
> > can vary. In fact, we had a select statement that actually marked
> > items as in process during select, and THAT was bad.
>
> Yeah, I'm keeping an eye on that.
>
> The problem I'm having however is that the DAGs are not getting
> populated with the tasks relevant to that specific scheduling run.  Do
> you have this working under Airflow today?
>
> > We have moved to x number of tasks, and each one grabs a line from
> > the DB, and 0 to n of them can actually get skipped if they don't get
> > a line from the DB.
>
> Yeah, I just don't want to a) setup yet another DB/table for an
> interstitial process or b) to re-invent dispatch/locking against a DB
> yet again.
>
> -- JCL
>

Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message