airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Norman Mu (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-392) DAG runs on strange schedule in the past when deployed
Date Fri, 05 Aug 2016 15:57:20 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15409611#comment-15409611
] 

Norman Mu commented on AIRFLOW-392:
-----------------------------------

I'm going to read through what you posted and close this with [~sanand]'s go-ahead

> DAG runs on strange schedule in the past when deployed
> ------------------------------------------------------
>
>                 Key: AIRFLOW-392
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-392
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: Airflow 1.7.1.3
>         Environment: AWS ElasticBeanstalk as a Docker application running in an Ubuntu-based
container
>            Reporter: David Klosowski
>            Assignee: Norman Mu
>
> Just deployed a new DAG ('weekly-no-track') that depends on 7 DAG task runs of another
DAG ('daily-no-track').  When the DAG is deployed the scheduler schedules and runs multiple
runs in the past (yesterday it ran for 6/12/2016 and 6/05/2016), despite the start date set
to the deployment date.  
> It would be a bit difficult to include all the code being used in the DAG since we have
multiple libraries we've built in Python that are being referenced here that we want to eventually
open source.  I've included some of the code here.  Let me know if this is all clear and what
I can do to help or if any insight can be provided as to what it is occurring and how we might
fix this.
> {code}
> from __future__ import division, print_function
> from airflow.models import DAG
> from airflow.operators import DummyOperator, ExternalTaskSensor, TimeDeltaSensor
> from tn_etl_tools.aws.emr import EmrConfig, HiveConfig, read_cluster_templates
> from tn_etl_tools.aws.emr import EmrService, EmrServiceWrapper, HiveStepBuilder
> from tn_etl_tools.datesupport import ts_add
> from tn_etl_tools.hive import HivePartitions
> from tn_etl_tools.yaml import YamlLoader
> from datetime import datetime, timedelta
> from dateutil.relativedelta import relativedelta, SU, MO , TU, WE, TH, FR, SA
> from common_args import merge_dicts, CommonHiveParams
> from operator_builders import add_tasks, emr_hive_operator
> import os
> # === configs
> config_dir = os.getenv('DAG_CONFIG_DIR', '/usr/local/airflow/config')
> alert_email = os.getenv('AIRFLOW_TO_EMAIL')
> app_properties = YamlLoader.load_yaml(config_dir + '/app.yml')
> emr_cluster_properties = YamlLoader.load_yaml(config_dir + '/emr_clusters.yml')
> emr_config = EmrConfig.load(STAGE=app_properties['STAGE'], **app_properties['EMR'])
> hive_config = HiveConfig.load(STAGE=app_properties['STAGE'], **app_properties['HIVE'])
> emr_cluster_templates = read_cluster_templates(emr_cluster_properties)
> # === /configs
> # TODO: force execution_date = sunday?
> run_for_date = datetime(2016, 8, 8)
> emr_service = EmrService()
> emr_service_wrapper = EmrServiceWrapper(emr_service=emr_service,
>                                         emr_config=emr_config, cluster_templates=emr_cluster_templates)
> hive_step_builder = HiveStepBuilder(hive_config=hive_config)
> hive_params = CommonHiveParams(app_properties_hive=app_properties['HIVE'])
> args = {'owner': 'airflow',
>         'depends_on_past': False,
>         'start_date': run_for_date,
>         'email': [alert_email],
>         'email_on_failure': True,
>         'email_on_retry': False,
>         'retries': 1,
>         'trigger_rule' : 'all_success',
>         'emr_service_wrapper': emr_service_wrapper,
>         'hive_step_builder': hive_step_builder}
> user_defined_macros = {'hive_partitions': HivePartitions,
>                        'ts_add': ts_add}
> params = {'stage': app_properties['STAGE']}
> dag = DAG(dag_id='weekly_no_track', default_args=args, user_defined_macros=user_defined_macros,
params=params,
>           schedule_interval=timedelta(days=7),
>           max_active_runs=1)
> # === task definitions
> task_definitions = {
>     'wait-for-dailies': {
>         'operator_type': 'dummy_operator', # hub for custom defined dependencies
>         'operator_args': {},
>         'depends_on': []
>     },
>     'weekly-no-track': {
>         'operator_type': 'emr_hive_operator',
>         'operator_args': {
>             'hive_step': {
>                 'script': 'weekly-no-track-airflow',  # temporary modified script with
separate output path
>                 'cluster_name': 'geoprofile',
>                 'script_vars': merge_dicts(hive_params.default_params(), hive_params.rundate_params(),
{
>                     'PARTITIONS': '{{hive_partitions.by_day(ts_add(ts, days=-6), ts_add(ts,
days=1))}}',
>                 }),
>             }
>         },
>         'depends_on': ['wait-for-dailies']
>     }
> }
> # === /task definitions
> operator_builders = {'emr_hive_operator': emr_hive_operator,
>                      'time_delta_sensor': TimeDeltaSensor,
>                      'dummy_operator': DummyOperator}
> add_tasks(task_definitions, dag=dag, operator_builders=operator_builders)
> # === custom tasks
> downstream_task = dag.get_task('wait-for-dailies')
> for weekday in [MO, TU, WE, TH, FR, SA, SU]:
>     task_id = 'wait-for-daily-{day}'.format(day=weekday)
>     # weekday(-1) subtracts 1 relative week from the given weekday, however if the calculated
date is already Monday,
>     # for example, -1 won't change the day.
>     delta = relativedelta(weekday=weekday(-1))
>     sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
>                                 external_dag_id='daily_no_track', external_task_id='daily-no-track',
>                                 execution_delta=delta, timeout=86400)  # 86400 = 24 hours
>     sensor.set_downstream(downstream_task)
> # === /custom tasks
> {code}
> Some referenced code
> {{common_args.py}}
> {code}
> from __future__ import division, print_function
> from copy import copy
> class CommonHiveParams(object):
>     def __init__(self, app_properties_hive):
>         super(CommonHiveParams, self).__init__()
>         # TODO: this should be part of a config object
>         self.app_properties_hive = app_properties_hive
>     def default_params(self):
>         return {
>             'HIVE_LIBS_BUCKET': self.app_properties_hive['S3_HIVE_LIB_BUCKET'],
>             'STAGE': '{{params.stage}}',
>         }
>     @staticmethod
>     def rundate_params():
>         return {
>             'YEAR': '{{execution_date.strftime("%Y")}}',
>             'MONTH': '{{execution_date.strftime("%m")}}',
>             'DAY': '{{execution_date.strftime("%d")}}',
>             'HOUR': '{{execution_date.strftime("%H")}}',
>             'MINUTE': '{{execution_date.strftime("%M")}}',
>         }
> def merge_dicts(*dicts):
>     """ Merge provided dicts without modification.
>     Duplicate keys are overwritten with values from the rightmost applicable dict.
>     """
>     if len(dicts) == 0:
>         return {}
>     result = copy(dicts[0])
>     for d in dicts[1:]:
>         result.update(d)
>     return result
> {code}
> {{operator_builders.py}}
> {code}
> """Functions for building operators from dict property definitions."""
> from __future__ import division, print_function
> from tn_airflow_components.operators.emr import EmrHiveOperator, create_emr_operator_with_step_sensor
> # TODO: this should not be a single package. Not every DAG needs EMR as a dependency,
for example.
> def emr_hive_operator(task_id, dag, hive_step, **kwargs):
>     return create_emr_operator_with_step_sensor(task_id=task_id, dag=dag,
>                                                 main_operator_class=EmrHiveOperator,
main_operator_kwargs=hive_step,
>                                                 **kwargs)
> def add_tasks(task_definitions, dag, operator_builders):
>     """Add tasks from dict definitions
>     :param task_definitions: dict of task definitions. Keys in the top-level dict are
used as the task IDs
>     :type task_definitions: dict
>     :param dag: the DAG in which to define the tasks
>     :type dag: airflow.models.DAG
>     :param operator_builders: mapping of str 'operator_type' values to operator builder
functions
>     :type operator_builders: dict
>     """
>     for task_id in task_definitions.keys():
>         task_definition = task_definitions[task_id]
>         operator_type = task_definition['operator_type']
>         operator = operator_builders[operator_type](task_id=task_id, dag=dag, **task_definition['operator_args'])
>         if task_definition['depends_on']:
>             for dependency in task_definition['depends_on']:
>                 operator.set_upstream(dag.get_task(dependency))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message