airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bolke de Bruin <>
Subject Re: AIRFLOW-20: Improving the scheduler by make dag runs more coherent
Date Sun, 01 May 2016 20:15:29 GMT


> Op 1 mei 2016, om 21:52 heeft Siddharth Anand <>
het volgende geschreven:
> Bolke,
> Thanks for taking the lead on this. Do you mind creating a confluence wiki page with
this proposal, so that we can link email, jiras, and the signal to it. We will need a roadmap
linked to our main site as well.
> Sent from Sid's iPhone 
>> On May 1, 2016, at 12:45 PM, Bolke de Bruin <> wrote:
>> Hi,
>> A lot of discussion has been around scheduler issues. Some of the symptoms are requiring
the start_date to align with the (cron)interval, not being able to update a DAG with a new
interval, backfills interfering with normal dag runs, unable to version dags. Work arounds
also have been provided eg., ignore_first_depends_on_past. Many issues on github have been
created with questions around why does the scheduler do this and not this. We can hardly say
it is working intuitively at the moment.
>> The following is a proposal for an update to the scheduler while remaining 99% backwards
compatible. Code (PR-1431) is fully working and all tests pass on travis. Now I am looking
for real world testers, feedback - Paul I understood you are also working on something on
the scheduler? - etc. :-).
>> Model changes
>> - TaskInstances now always have an associated DagRun
>> - TaskInstances can still be run without a real DagRun but they need the special
“-1” DagRun id.
>> - Backfills also create DagRuns
>> Scheduler
>> - Scheduler is aware of backfills
>> - Scheduler will fast forward beyond backfill if needed while maintaining the correct
>> - Scheduler will adjust start date for interval (no more aligning needed)
>> Backfills
>> - Backfills can update the past, but will maintain lineage
>> - Backfills insert themselves in between scheduled dag runs if needed, making depend_on_past
work with arbitrary inserted backfills
>> Where might there be issues:
>> - If you are creating TaskInstances inside DAGs yourself or in Operators you might
need to update those to take into account a dag_run_id. For Operators have a look at the PythonOperator
how to update it.
>> - If you are issuing “airflow run xxx” *yourself*, thus outside airflow: for
now Airflow will set your dag_run_id to -1, but is this really what you want? Please let me
know the use cases and discuss.
>> - For depends_on_past I check on the DagRun level, it might be required to move this
to the TaskInstance level. Not a big change and we can even support both!
>> If you consider DAGs a description of coherence work and Tasks (as defined in Airflow)
a description of work, then DagRuns are the instantiation of DAGS in a point of time. TaskInstances
are then the instantiations of Tasks in a point of time. To maintain the coherence between
TaskInstances a DagRun needs to be aware of the TaskInstances it has. So far so good. Airflow
does this quite well and as long as you don’t try to do something fancy like updating you
interval with depend_on_past your tasks will run happily. Now enter backfills. Backfills allow
you to either create or alter history. It does this by arbitrarily inserting tasks into the
time line and completely disregarding the scheduler. Also the scheduler does not know about
backfills leading to other issues.
>> If we would maintain a time line of DagRuns this would solve the above mentioned
issues, improve lineage and pave the way for DAG versioning. It would also simplify the code
in the future by moving a lot of logic to DagRun. While one might argue that currently  “dag_id
+ execution_date” already does this for both DagRun and TaskInstances this foregoes the
issue that backfills create in the scheduler, in lineage and in versioning in addition this
would not allow you to solve the moving interval easily. With backfills you would be able
to run an updated dag that changes the past, but how to answer what version of the DAG you
ran when? Depending on past in a backfill with a new task is also quite hard.
>> DagRuns now maintain a “previous” property. Previous points to the previous dag
run by id, if previous is None it is considered the first DagRun. The scheduler will set the
previous property if it detects a previous run. A previous run can be a scheduled run or a
backfill run. If will adhere to the schedule_interval, but it will fast forward beyond the
latest execution date of either the last scheduled run or last backfill run whatever comes
later. So at the moment the scheduler will not fill in the blanks for you if there is a gap
between the scheduled run and a backfill run of more than one interval.
>> Backfill will create a DagRun and insert it into the timeline, ie. it will update
the previous property of a scheduled DagRun if it is set in the future seen from the Backfill
DagRun. If it encounters a DagRun at the same execution date for the same dag_id it will re-own
the tasksinstances and set the state of the other DagRun to “overridden”, essentially
orphaning the other DagRun but keeping its record around for auditing purposes.
>> TaskInstances now maintain a reference to a DagRun, which cannot be None. If it is
created with None it will be set to the special “-1” DagRun ID. DagRun IDs that are “None”
will lead to an integrity error at the database level. “-1” DagRun ID TaskInstances are
treated as they are now. However Airflow itself will not create such TaskInstances anymore
so they come from outside. In the context of the DAGS being the description of “coherence
of work” I have a lot of difficulty understanding having work available that is not coherent
>> Bolke

View raw message