airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Klosowski <dav...@thinknear.com>
Subject Re: DAG scheduled for start_date of today and an interval of 7 days keeps getting scheduled for the past
Date Wed, 03 Aug 2016 03:02:59 GMT
start_date being updated isn't the issue here.  I haven't changed it.   New
execution_dates keep getting created for the past before any dags or
start_dates existed.

Cheers,
David

On Tue, Aug 2, 2016 at 7:10 PM, siddharth anand <sanand@apache.org> wrote:

> The problem might be that the start_date does not get updated. I work
> around this by changing the name of my dag. I do lose history as well, but
> it works.
>
> My dags are named "some_dag_v1". When I change a start date, I update the
> version suffix to force a reload : "some_dag_v2"
>
> -s
>
> On Tue, Aug 2, 2016 at 6:49 PM, David Klosowski <davidk@thinknear.com>
> wrote:
>
> > I have a DAG that I just deployed that the scheduler keeps scheduling for
> > the last two months in the past.
> >
> > start_date: 8/5/2016
> >
> > scheduled runs started:
> > 7/3/2016
> > 6/5/2016
> >
> > Here is the gist of this DAG's architecture:
> >
> > The DAG depends another dags tasks using 7 dynamic ExternalTaskSensors
> that
> > it builds which that represent 'daily' jobs and then has a DummyOperator
> > task which aggregates and triggers the 'weekly' job task upon completion.
> >
> > Some of the code showcasing this:
> >
> > run_for_date = datetime(2016, 8, 2)
> >
> > args = {'owner': 'airflow',
> >         'depends_on_past': False,
> >         'start_date': run_for_date,
> >         'email': [alert_email],
> >         'email_on_failure': True,
> >         'email_on_retry': False,
> >         'retries': 1,
> >         'trigger_rule' : 'all_success'}
> >
> > dag = DAG(dag_id='weekly_no_track', default_args=args,
> >           schedule_interval=timedelta(days=7),
> >           max_active_runs=1)
> >
> >
> > downstream_task = dag.get_task('wait-for-dailies')
> > for weekday in [MO, TU, WE, TH, FR, SA, SU]:
> >     task_id = 'wait-for-daily-{day}'.format(day=weekday)
> >
> >     # weekday(-1) subtracts 1 relative week from the given weekday,
> however
> > if the calculated date is already Monday,
> >     # for example, -1 won't change the day.
> >     delta = relativedelta(weekday=weekday(-1))
> >
> >     sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
> >                                 external_dag_id='daily_no_track',
> > external_task_id='daily-no-track',
> >                                 execution_delta=delta, timeout=86400)  #
> > 86400 = 24 hours
> >     sensor.set_downstream(downstream_task)
> >
> >
> > I don't understand what is going on.  Why is the scheduler doing this?  I
> > want the DAG to start considering dates from today and on in UTC.
> >
> > Cheers,
> > David
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message