airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Tyukin <bo...@boristyukin.com>
Subject Re: Reminder : LatestOnlyOperator
Date Mon, 20 Mar 2017 18:12:30 GMT
depends_on_past is looking at previous task instance which sounds the same
as "latestonly" but the difference becomes apparent if you look at this
example.

Let's say you have a dag, scheduled to run every day and it has been
failing for the past 3 days. The whole purpose of that dag is to populate
snapshot table or do a daily backup.  If you use depends on past, you would
have to rerun all missed runs or mark them as successful eventually doing
useless work (3 daily snapshots or backups for the same data).

LatestOnly allows you to bypass missed runs and just do it once for most
recent instance.

Another difference, depends on past is tricky if you use BranchOperator
because some branches may not run one day and run another - it will really
mess up your logic.

On Mon, Mar 20, 2017 at 12:45 PM, Ruslan Dautkhanov <dautkhanov@gmail.com>
wrote:

> Thanks Boris. It does make sense.
> Although how it's different from depends_on_past task-level parameter?
> In both cases, a task will be skipped if there is another TI of this task
> is still running (from a previous dagrun), right?
>
>
> Thanks,
> Ruslan
>
>
> On Sat, Mar 18, 2017 at 7:11 PM, Boris Tyukin <boris@boristyukin.com>
> wrote:
>
> > you would just chain them - there is an example that came with airflow
> 1.8
> > https://github.com/apache/incubator-airflow/blob/master/
> > airflow/example_dags/example_latest_only.py
> >
> > so in your case, instead of dummy operator, you would use your Oracle
> > operator.
> >
> > Does it make sense?
> >
> >
> > On Sat, Mar 18, 2017 at 7:12 PM, Ruslan Dautkhanov <dautkhanov@gmail.com
> >
> > wrote:
> >
> > > Is there is a way to combine scheduling behavior operators  (like this
> > > LatestOnlyOperator)
> > > with a functional operator (like Oracle_Operator)? I was thinking
> > multiple
> > > inheritance would do,like
> > >
> > > > class Oracle_LatestOnly_Operator (Oracle_Operator,
> LatestOnlyOperator):
> > > > ...
> > >
> > > I might be overthinking this and there could be a simpler way?
> > > Sorry, I am still learning Airflow concepts...
> > >
> > > Thanks.
> > >
> > >
> > >
> > > --
> > > Ruslan Dautkhanov
> > >
> > > On Sat, Mar 18, 2017 at 2:15 PM, Boris Tyukin <boris@boristyukin.com>
> > > wrote:
> > >
> > > > Thanks George for that feature!
> > > >
> > > > sure, just created a jira on this
> > > > https://issues.apache.org/jira/browse/AIRFLOW-1008
> > > >
> > > >
> > > > On Sat, Mar 18, 2017 at 12:05 PM, siddharth anand <sanand@apache.org
> >
> > > > wrote:
> > > >
> > > > > Thx Boris . Credit goes to George (gwax) for the implementation of
> > the
> > > > > LatestOnlyOperator.
> > > > >
> > > > > Boris,
> > > > > Can you describe what you mean in a Jira?
> > > > > -s
> > > > >
> > > > > On Fri, Mar 17, 2017 at 6:02 PM, Boris Tyukin <
> boris@boristyukin.com
> > >
> > > > > wrote:
> > > > >
> > > > > > this is nice indeed along with the new catchup option
> > > > > > https://airflow.incubator.apache.org/scheduler.html#
> > > > backfill-and-catchup
> > > > > >
> > > > > > Thanks Sid and Ben for adding these new options!
> > > > > >
> > > > > > for a complete picture, it would be nice to force only one dag
> run
> > at
> > > > the
> > > > > > time.
> > > > > >
> > > > > > On Fri, Mar 17, 2017 at 7:33 PM, siddharth anand <
> > sanand@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > With the Apache Airflow 1.8 release imminent, you may want
to
> try
> > > out
> > > > > the
> > > > > > >
> > > > > > > *LatestOnlyOperator.*
> > > > > > >
> > > > > > > If you want your DAG to only run on the most recent scheduled
> > slot,
> > > > > > > regardless of backlog, this operator will skip running
> downstream
> > > > tasks
> > > > > > for
> > > > > > > all DAG Runs prior to the current time slot.
> > > > > > >
> > > > > > > For example, I might have a DAG that takes a DB snapshot
once a
> > > day.
> > > > It
> > > > > > > might be that I paused that DAG for 2 weeks or that I had
set
> the
> > > > start
> > > > > > > date to a fixed data 2 weeks in the past. When I enable
my
> DAG, I
> > > > don't
> > > > > > > want it to run 14 days' worth of snapshots for the current
> state
> > of
> > > > the
> > > > > > DB
> > > > > > > -- that's unnecessary work.
> > > > > > >
> > > > > > > The LatestOnlyOperator avoids that work.
> > > > > > >
> > > > > > > https://github.com/apache/incubator-airflow/commit/
> > > > > > > edf033be65b575f44aa221d5d0ec9ecb6b32c67a
> > > > > > >
> > > > > > > With it, you can simply use
> > > > > > > latest_only = LatestOnlyOperator(task_id='latest_only',
> dag=dag)
> > > > > > >
> > > > > > > instead of
> > > > > > > def skip_to_current_job(ds, **kwargs):
> > > > > > >     now = datetime.now()
> > > > > > >     left_window = kwargs['dag'].following_
> > > > schedule(kwargs['execution_
> > > > > > > date'])
> > > > > > >     right_window = kwargs['dag'].following_
> schedule(left_window)
> > > > > > >     logging.info(('Left Window {}, Now {}, Right Window
> > > > > > > {}').format(left_window,now,right_window))
> > > > > > >     if not now <= right_window:
> > > > > > >         logging.info('Not latest execution, skipping
> > downstream.')
> > > > > > >         return False
> > > > > > >     return True
> > > > > > >
> > > > > > > short_circuit = ShortCircuitOperator(
> > > > > > >   task_id         = 'short_circuit_if_not_current_job',
> > > > > > >   provide_context = True,
> > > > > > >   python_callable = skip_to_current_job,
> > > > > > >   dag             = dag
> > > > > > > )
> > > > > > >
> > > > > > > -s
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message