airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From siddharth anand <>
Subject Re: scheduler questions
Date Thu, 13 Oct 2016 17:11:39 GMT

*Question 1*
Only_Run_Latest is in master -
That will solve your problem.

Releases come out one a quarter sometimes once every 2 quarters, so I would
recommend that you run off master or off your own fork.

You could also achieve this yourself with the following code snippet. It
uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
being executed is not the current one. It will work for any schedule. The
code below has essentially been implemented in the LatestOnlyOperator above
for convenience.

def skip_to_current_job(ds, **kwargs):

    now =

    left_window = kwargs['dag'].following_schedule(kwargs['execution_date'])

    right_window = kwargs['dag'].following_schedule(left_window)'Left Window {}, Now {}, Right Window {}'

    if not now <= right_window:'Not latest execution, skipping downstream.')

        return False

    return True

t0 = ShortCircuitOperator(

  task_id         = 'short_circuit_if_not_current,

  provide_context = True,

  python_callable = skip_to_current_job,

  dag             = dag



On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin <> wrote:

> Hello all and thanks for such an amazing project! I have been evaluating
> Airflow and spent a few days reading about it and playing with it and I
> have a few questions that I struggle to understand.
> Let's say I have a simple DAG that runs once a day and it is doing a full
> reload of tables from the source database so the process is not
> incremental.
> Let's consider this scenario:
> Day 1 - OK
> Day 2 - airflow scheduler or server with airflow is down for some reason
> ((or
> DAG is paused)
> Day 3 - still down(or DAG is paused)
> Day 4 - server is up and now needs to run missing jobs.
> How can I make airflow to run only Day 4 job and not backfill Day 2 and 3?
> I tried to do depend_on_past = True but it does not seem to do this trick.
> I also found in a roadmap doc this but seems it is not made to the release
> yet:
>  Only Run Latest - Champion : Sid
> • For cases where we need to only run the latest in a series of task
> instance runs and mark the others as skipped. For example, we may have job
> to execute a DB snapshot every day. If the DAG is paused for 5 days and
> then unpaused, we don’t want to run all 5, just the latest. With this
> feature, we will provide “cron” functionality for task scheduling that is
> not related to ETL
> My second question, what if I have another DAG that does incremental loads
> from a source table:
> Day 1 - OK, loaded new/changed data for previous day
> Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
> Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
> Day 4 - source system is up, Airflow Dagrun succeeded
> My problem (unless I am missing something), Airflow on Day 4 would use
> execution time from Day 3, so the interval for incremental load would be
> since the last run (which was Failed). My hope it would use the last
> _successful_ run so on Day 4 it would go back to Day 1. Is it possible to
> achieve this?
> I am aware of a manual backfill command via CLI but I am not sure I want to
> use due to all the issues and inconsistencies I've read about it.
> Thanks!

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message