airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alek Storm <alek.st...@gmail.com>
Subject Re: Run job multiple times with same execution date
Date Thu, 27 Apr 2017 20:04:35 GMT
It's `0 2-20/3 * * *`. The problem with using an offset from the execution
date is that, for the very first run of the day, its execution date will
*already* be the previous day (since it covers 8pm the previous day til 2am
today) - subtracting from that would yield a date two days in the past,
which is not what we want. Subsequent runs (at 5am, 8am, etc) have their
execution date as the current day, so subtracting would work in those cases.

Alek

On Thu, Apr 27, 2017 at 2:55 PM, Paul Zaczkiewicz <paulzacz@gmail.com>
wrote:

> First off, what schedule are you giving your DAG? I assume it's something
> like "0 */4 * * *".
>
> Why don't you subtract 1 day from next_execution_date instead?
>
> On Thu, Apr 27, 2017 at 2:28 PM, Alek Storm <alek.storm@gmail.com> wrote:
>
> > Right, an easier way to do that would probably be to use the
> > `prev_execution_date` macro, but either way, it would give the wrong
> result
> > on the first run of the day, since its start_date would already be the
> > previous day.
> >
> > Alek
> >
> > On Thu, Apr 27, 2017 at 1:17 PM, Shah Altaf <mendhak@gmail.com> wrote:
> >
> > > Hello. You can get access to the execution date of today and subtract a
> > day
> > > from it.  You can use a PythonOperator with provide_context=True, that
> > will
> > > give your Python method a ds argument that has the execution date in
> it.
> > > Manipulate it and return an XCOM.
> > >
> > >  For example, here's a filename being derived from execution date minus
> > one
> > > day:
> > >
> > >     def get_filename_to_process(ds, **kwargs):
> > >         execdate=datetime.strptime(ds,"%Y-%m-%d")
> > >         my_file_path =
> > > "dirname/{0}{1:02d}{2:02d}000000.csv".format(execdate.
> > > year,execdate.month,execdate.day-1)
> > >         logging.info("my_file_path: ".format(my_file_path))
> > >         return my_file_path
> > >
> > >     get_filename = PythonOperator(
> > >                 task_id='the_filename',
> > >                 python_callable=get_filename_to_process,
> > >                 provide_context=True,
> > >                 dag=dag)
> > >
> > >
> > > Then, from other operators, you'd pull it in using some templating:
> > >
> > >     {{ task_instance.xcom_pull(task_ids='the_filename') }}
> > >
> > > Hope that helps, sorry if I misunderstood your question.
> > >
> > >
> > >
> > > On Thu, Apr 27, 2017 at 6:28 PM Alek Storm <alek.storm@gmail.com>
> wrote:
> > >
> > > > I'd like to create a job that runs six times a day, all with the same
> > > > execution date (the previous day). Is there a way to do this without
> > > > generating six separate DAGs, or running an `airflow clear` command
> in
> > > > between runs?
> > > >
> > > > Thanks,
> > > > Alek
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message