airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeremiah Lowin <>
Subject Re: AIRFLOW-20: Improving the scheduler by make dag runs more coherent
Date Mon, 02 May 2016 11:48:42 GMT
Well done Bolke and thanks for all the work on this! Looking forward to
kicking the tires later today.

After last week's conversations I understand why users would want a run_id
but I'm not totally sold that (dag_id, execution_date) is an insufficient
primary key for a DagRun. My reasoning is this: if (dag_id, execution_date)
is a bad primary key, then there must be times we violate it. But under
what circumstances would we ever want two database entries for the state of
the same DAG on the same execution_date? It seems to me that if we update
an already-known DagRun state -- for example, by backfilling over a
scheduled run or running backfill twice -- then we are *updating* the
existing DagRun rather than *creating* a new DagRun, and so the primary key
uniqueness is maintained.

Bolke proposed having a larger (realtime) conversation around this and I
think it's a great idea. Perhaps we could do that this week? As you all
know I'm a big proponent of refactoring this whole area and on the whole
I'm hugely supportive of this work -- I just have this one outstanding
question as outlined above.

On Sun, May 1, 2016 at 4:15 PM Bolke de Bruin <> wrote:

> Done!
> Bolke
> > Op 1 mei 2016, om 21:52 heeft Siddharth Anand <>
> het volgende geschreven:
> >
> > Bolke,
> > Thanks for taking the lead on this. Do you mind creating a confluence
> wiki page with this proposal, so that we can link email, jiras, and the
> signal to it. We will need a roadmap linked to our main site as well.
> >
> > Sent from Sid's iPhone
> >
> >> On May 1, 2016, at 12:45 PM, Bolke de Bruin <> wrote:
> >>
> >> Hi,
> >>
> >> A lot of discussion has been around scheduler issues. Some of the
> symptoms are requiring the start_date to align with the (cron)interval, not
> being able to update a DAG with a new interval, backfills interfering with
> normal dag runs, unable to version dags. Work arounds also have been
> provided eg., ignore_first_depends_on_past. Many issues on github have been
> created with questions around why does the scheduler do this and not this.
> We can hardly say it is working intuitively at the moment.
> >>
> >> The following is a proposal for an update to the scheduler while
> remaining 99% backwards compatible. Code (PR-1431) is fully working and all
> tests pass on travis. Now I am looking for real world testers, feedback -
> Paul I understood you are also working on something on the scheduler? -
> etc. :-).
> >>
> >>
> >> Model changes
> >> - TaskInstances now always have an associated DagRun
> >> - TaskInstances can still be run without a real DagRun but they need
> the special “-1” DagRun id.
> >> - Backfills also create DagRuns
> >>
> >> Scheduler
> >> - Scheduler is aware of backfills
> >> - Scheduler will fast forward beyond backfill if needed while
> maintaining the correct interval
> >> - Scheduler will adjust start date for interval (no more aligning
> needed)
> >>
> >> Backfills
> >> - Backfills can update the past, but will maintain lineage
> >> - Backfills insert themselves in between scheduled dag runs if needed,
> making depend_on_past work with arbitrary inserted backfills
> >>
> >> Where might there be issues:
> >> - If you are creating TaskInstances inside DAGs yourself or in
> Operators you might need to update those to take into account a dag_run_id.
> For Operators have a look at the PythonOperator how to update it.
> >> - If you are issuing “airflow run xxx” *yourself*, thus outside
> airflow: for now Airflow will set your dag_run_id to -1, but is this really
> what you want? Please let me know the use cases and discuss.
> >> - For depends_on_past I check on the DagRun level, it might be required
> to move this to the TaskInstance level. Not a big change and we can even
> support both!
> >>
> >> If you consider DAGs a description of coherence work and Tasks (as
> defined in Airflow) a description of work, then DagRuns are the
> instantiation of DAGS in a point of time. TaskInstances are then the
> instantiations of Tasks in a point of time. To maintain the coherence
> between TaskInstances a DagRun needs to be aware of the TaskInstances it
> has. So far so good. Airflow does this quite well and as long as you don’t
> try to do something fancy like updating you interval with depend_on_past
> your tasks will run happily. Now enter backfills. Backfills allow you to
> either create or alter history. It does this by arbitrarily inserting tasks
> into the time line and completely disregarding the scheduler. Also the
> scheduler does not know about backfills leading to other issues.
> >>
> >> If we would maintain a time line of DagRuns this would solve the above
> mentioned issues, improve lineage and pave the way for DAG versioning. It
> would also simplify the code in the future by moving a lot of logic to
> DagRun. While one might argue that currently  “dag_id + execution_date”
> already does this for both DagRun and TaskInstances this foregoes the issue
> that backfills create in the scheduler, in lineage and in versioning in
> addition this would not allow you to solve the moving interval easily. With
> backfills you would be able to run an updated dag that changes the past,
> but how to answer what version of the DAG you ran when? Depending on past
> in a backfill with a new task is also quite hard.
> >>
> >> DagRuns now maintain a “previous” property. Previous points to the
> previous dag run by id, if previous is None it is considered the first
> DagRun. The scheduler will set the previous property if it detects a
> previous run. A previous run can be a scheduled run or a backfill run. If
> will adhere to the schedule_interval, but it will fast forward beyond the
> latest execution date of either the last scheduled run or last backfill run
> whatever comes later. So at the moment the scheduler will not fill in the
> blanks for you if there is a gap between the scheduled run and a backfill
> run of more than one interval.
> >>
> >> Backfill will create a DagRun and insert it into the timeline, ie. it
> will update the previous property of a scheduled DagRun if it is set in the
> future seen from the Backfill DagRun. If it encounters a DagRun at the same
> execution date for the same dag_id it will re-own the tasksinstances and
> set the state of the other DagRun to “overridden”, essentially orphaning
> the other DagRun but keeping its record around for auditing purposes.
> >>
> >> TaskInstances now maintain a reference to a DagRun, which cannot be
> None. If it is created with None it will be set to the special “-1” DagRun
> ID. DagRun IDs that are “None” will lead to an integrity error at the
> database level. “-1” DagRun ID TaskInstances are treated as they are now.
> However Airflow itself will not create such TaskInstances anymore so they
> come from outside. In the context of the DAGS being the description of
> “coherence of work” I have a lot of difficulty understanding having work
> available that is not coherent ;-).
> >>
> >>
> >> Bolke
> >>
> >>
> >>
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message