airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Maguire <andrewm4...@gmail.com>
Subject Re: Simple DAG Structure
Date Mon, 23 Jul 2018 17:58:56 GMT
Maybe you could have A and B report their outcome somewhere and then use
that output, read back in from somewhere, as a check operator in C.

This is kinda reinventing the wheel a little bit though as ideally would be
a way to keep all that state inside airflow.

I think what I suggest would work, but maybe a little hackish.

On Mon, 23 Jul 2018, 14:33 srinivas.ramabhadran@gmail.com, <
srinivas.ramabhadran@gmail.com> wrote:

> Carl -
>
>    Thanks, that definitely works, but it's non-ideal. If I had 100s of
> jobs running throughout the day, a TimeSensor task (process) gets created
> for each task at midnight even though a task may not be required to run for
> a very long time (e.g. a whole bunch of tasks need to run @ 20:00. All of
> their time sensors are kicked off at 00:00). Worse still, if I used a
> LocalExcecutor with a pool size of 10, some jobs that need to run early may
> not even get scheduled in favor of time sensors for tasks later in the day
> which only perform a sleep operation.
>
>    Is there another way to do this? If not, is there at least another way
> around the LocalExecutor problem?
>
> Ram.
>
>
> On 2018/07/23 08:23:45, Carl Johan Gustavsson <carl.j.gustavsson@gmail.com>
> wrote:
> > Hi Ram,
> >
> > You can have a single DAG scheduled to 10am, which starts A and then use
> a TimeSensor set to 11 am that B depends on  and then have C depend on A
> and B.
> >
> > Something like:
> >
> > a = BashOperator(‘a’, …)
> >
> > delay_b = TimeSensor(‘delay_b’, target_time=time(11, 0, 0), …)
> > b = BashOperator(‘b’, …)
> > b.set_upstream(delay_b)
> >
> > c = BashOperator(‘c’, …)
> > c.set_upstream(a)
> > c.set_upstream(b)
> >
> >
> > / Carl Johan
> > On 23 July 2018 at 02:18:00, srinivas.ramabhadran@gmail.com (
> srinivas.ramabhadran@gmail.com) wrote:
> >
> > Hi -
> >
> > I have recently started using Airflow version 1.9.0 and am having some
> difficulty setting up a very simple DAG. I have three tasks A, B and C. I'd
> like A to run every day at 10am and B at 11am. C depends on BOTH A and B
> running successfully.
> >
> > Initially, I decided to create one DAG, add all three tasks to it and
> set C as downstream to A and B. I then set the schedule_interval of the DAG
> to @daily. But this meant I couldn't run A and B at 10am and 11am
> respectively since the they are PythonOperators and tasks dont support
> schedule_interval (or, at least, it's deprecated syntax and gets ignored).
> >
> > I scratched that idea and then created A and B as DAGs, specified the
> schedule interval as per the cron syntax: '00 10 * * *' for A and '00 11 *
> * *' for B. But now when I set C as a downstream of A and B, it complains
> that C can't belong to two different dags.
> >
> > How do I accomplish such a simple dependency structure?
> >
> > Ram.
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message