airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ruslan Dautkhanov <dautkha...@gmail.com>
Subject Re: Reminder : LatestOnlyOperator
Date Mon, 20 Mar 2017 16:45:43 GMT
Thanks Boris. It does make sense.
Although how it's different from depends_on_past task-level parameter?
In both cases, a task will be skipped if there is another TI of this task
is still running (from a previous dagrun), right?


Thanks,
Ruslan


On Sat, Mar 18, 2017 at 7:11 PM, Boris Tyukin <boris@boristyukin.com> wrote:

> you would just chain them - there is an example that came with airflow 1.8
> https://github.com/apache/incubator-airflow/blob/master/
> airflow/example_dags/example_latest_only.py
>
> so in your case, instead of dummy operator, you would use your Oracle
> operator.
>
> Does it make sense?
>
>
> On Sat, Mar 18, 2017 at 7:12 PM, Ruslan Dautkhanov <dautkhanov@gmail.com>
> wrote:
>
> > Is there is a way to combine scheduling behavior operators  (like this
> > LatestOnlyOperator)
> > with a functional operator (like Oracle_Operator)? I was thinking
> multiple
> > inheritance would do,like
> >
> > > class Oracle_LatestOnly_Operator (Oracle_Operator, LatestOnlyOperator):
> > > ...
> >
> > I might be overthinking this and there could be a simpler way?
> > Sorry, I am still learning Airflow concepts...
> >
> > Thanks.
> >
> >
> >
> > --
> > Ruslan Dautkhanov
> >
> > On Sat, Mar 18, 2017 at 2:15 PM, Boris Tyukin <boris@boristyukin.com>
> > wrote:
> >
> > > Thanks George for that feature!
> > >
> > > sure, just created a jira on this
> > > https://issues.apache.org/jira/browse/AIRFLOW-1008
> > >
> > >
> > > On Sat, Mar 18, 2017 at 12:05 PM, siddharth anand <sanand@apache.org>
> > > wrote:
> > >
> > > > Thx Boris . Credit goes to George (gwax) for the implementation of
> the
> > > > LatestOnlyOperator.
> > > >
> > > > Boris,
> > > > Can you describe what you mean in a Jira?
> > > > -s
> > > >
> > > > On Fri, Mar 17, 2017 at 6:02 PM, Boris Tyukin <boris@boristyukin.com
> >
> > > > wrote:
> > > >
> > > > > this is nice indeed along with the new catchup option
> > > > > https://airflow.incubator.apache.org/scheduler.html#
> > > backfill-and-catchup
> > > > >
> > > > > Thanks Sid and Ben for adding these new options!
> > > > >
> > > > > for a complete picture, it would be nice to force only one dag run
> at
> > > the
> > > > > time.
> > > > >
> > > > > On Fri, Mar 17, 2017 at 7:33 PM, siddharth anand <
> sanand@apache.org>
> > > > > wrote:
> > > > >
> > > > > > With the Apache Airflow 1.8 release imminent, you may want to
try
> > out
> > > > the
> > > > > >
> > > > > > *LatestOnlyOperator.*
> > > > > >
> > > > > > If you want your DAG to only run on the most recent scheduled
> slot,
> > > > > > regardless of backlog, this operator will skip running downstream
> > > tasks
> > > > > for
> > > > > > all DAG Runs prior to the current time slot.
> > > > > >
> > > > > > For example, I might have a DAG that takes a DB snapshot once
a
> > day.
> > > It
> > > > > > might be that I paused that DAG for 2 weeks or that I had set
the
> > > start
> > > > > > date to a fixed data 2 weeks in the past. When I enable my DAG,
I
> > > don't
> > > > > > want it to run 14 days' worth of snapshots for the current state
> of
> > > the
> > > > > DB
> > > > > > -- that's unnecessary work.
> > > > > >
> > > > > > The LatestOnlyOperator avoids that work.
> > > > > >
> > > > > > https://github.com/apache/incubator-airflow/commit/
> > > > > > edf033be65b575f44aa221d5d0ec9ecb6b32c67a
> > > > > >
> > > > > > With it, you can simply use
> > > > > > latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
> > > > > >
> > > > > > instead of
> > > > > > def skip_to_current_job(ds, **kwargs):
> > > > > >     now = datetime.now()
> > > > > >     left_window = kwargs['dag'].following_
> > > schedule(kwargs['execution_
> > > > > > date'])
> > > > > >     right_window = kwargs['dag'].following_schedule(left_window)
> > > > > >     logging.info(('Left Window {}, Now {}, Right Window
> > > > > > {}').format(left_window,now,right_window))
> > > > > >     if not now <= right_window:
> > > > > >         logging.info('Not latest execution, skipping
> downstream.')
> > > > > >         return False
> > > > > >     return True
> > > > > >
> > > > > > short_circuit = ShortCircuitOperator(
> > > > > >   task_id         = 'short_circuit_if_not_current_job',
> > > > > >   provide_context = True,
> > > > > >   python_callable = skip_to_current_job,
> > > > > >   dag             = dag
> > > > > > )
> > > > > >
> > > > > > -s
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message