airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From siddharth anand <san...@apache.org>
Subject Re: DAG scheduled for start_date of today and an interval of 7 days keeps getting scheduled for the past
Date Wed, 03 Aug 2016 02:10:56 GMT
The problem might be that the start_date does not get updated. I work
around this by changing the name of my dag. I do lose history as well, but
it works.

My dags are named "some_dag_v1". When I change a start date, I update the
version suffix to force a reload : "some_dag_v2"

-s

On Tue, Aug 2, 2016 at 6:49 PM, David Klosowski <davidk@thinknear.com>
wrote:

> I have a DAG that I just deployed that the scheduler keeps scheduling for
> the last two months in the past.
>
> start_date: 8/5/2016
>
> scheduled runs started:
> 7/3/2016
> 6/5/2016
>
> Here is the gist of this DAG's architecture:
>
> The DAG depends another dags tasks using 7 dynamic ExternalTaskSensors that
> it builds which that represent 'daily' jobs and then has a DummyOperator
> task which aggregates and triggers the 'weekly' job task upon completion.
>
> Some of the code showcasing this:
>
> run_for_date = datetime(2016, 8, 2)
>
> args = {'owner': 'airflow',
>         'depends_on_past': False,
>         'start_date': run_for_date,
>         'email': [alert_email],
>         'email_on_failure': True,
>         'email_on_retry': False,
>         'retries': 1,
>         'trigger_rule' : 'all_success'}
>
> dag = DAG(dag_id='weekly_no_track', default_args=args,
>           schedule_interval=timedelta(days=7),
>           max_active_runs=1)
>
>
> downstream_task = dag.get_task('wait-for-dailies')
> for weekday in [MO, TU, WE, TH, FR, SA, SU]:
>     task_id = 'wait-for-daily-{day}'.format(day=weekday)
>
>     # weekday(-1) subtracts 1 relative week from the given weekday, however
> if the calculated date is already Monday,
>     # for example, -1 won't change the day.
>     delta = relativedelta(weekday=weekday(-1))
>
>     sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
>                                 external_dag_id='daily_no_track',
> external_task_id='daily-no-track',
>                                 execution_delta=delta, timeout=86400)  #
> 86400 = 24 hours
>     sensor.set_downstream(downstream_task)
>
>
> I don't understand what is going on.  Why is the scheduler doing this?  I
> want the DAG to start considering dates from today and on in UTC.
>
> Cheers,
> David
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message