airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From siddharth anand <san...@apache.org>
Subject Reminder : LatestOnlyOperator
Date Fri, 17 Mar 2017 23:33:11 GMT
With the Apache Airflow 1.8 release imminent, you may want to try out the

*LatestOnlyOperator.*

If you want your DAG to only run on the most recent scheduled slot,
regardless of backlog, this operator will skip running downstream tasks for
all DAG Runs prior to the current time slot.

For example, I might have a DAG that takes a DB snapshot once a day. It
might be that I paused that DAG for 2 weeks or that I had set the start
date to a fixed data 2 weeks in the past. When I enable my DAG, I don't
want it to run 14 days' worth of snapshots for the current state of the DB
-- that's unnecessary work.

The LatestOnlyOperator avoids that work.

https://github.com/apache/incubator-airflow/commit/edf033be65b575f44aa221d5d0ec9ecb6b32c67a

With it, you can simply use
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

instead of
def skip_to_current_job(ds, **kwargs):
    now = datetime.now()
    left_window = kwargs['dag'].following_schedule(kwargs['execution_date'])
    right_window = kwargs['dag'].following_schedule(left_window)
    logging.info(('Left Window {}, Now {}, Right Window
{}').format(left_window,now,right_window))
    if not now <= right_window:
        logging.info('Not latest execution, skipping downstream.')
        return False
    return True

short_circuit = ShortCircuitOperator(
  task_id         = 'short_circuit_if_not_current_job',
  provide_context = True,
  python_callable = skip_to_current_job,
  dag             = dag
)

-s

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message