airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bolke de Bruin (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AIRFLOW-1013) airflow/jobs.py:manage_slas() exception for @once dag
Date Mon, 03 Apr 2017 21:15:41 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-1013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Bolke de Bruin updated AIRFLOW-1013:
------------------------------------
    Priority: Critical  (was: Blocker)

> airflow/jobs.py:manage_slas() exception for @once dag
> -----------------------------------------------------
>
>                 Key: AIRFLOW-1013
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1013
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: Airflow 1.8
>            Reporter: Ruslan Dautkhanov
>            Assignee: Siddharth Anand
>            Priority: Critical
>              Labels: dagrun, once, scheduler, sla
>             Fix For: 1.8.1
>
>
> Getting following exception 
> {noformat}
> [2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an exception!
Propagating...
> Traceback (most recent call last):
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py",
line 346, in helper
>     pickle_dags)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py",
line 1581, in process_file
>     self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py",
line 1175, in _process_dags
>     self.manage_slas(dag)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py",
line 595, in manage_slas
>     while dttm < datetime.now():
> TypeError: can't compare datetime.datetime to NoneType
> {noformat}
> Exception is in airflow/jobs.py:manage_slas() :
> https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595
> {code}
>         ts = datetime.now()
>         SlaMiss = models.SlaMiss
>         for ti in max_tis:
>             task = dag.get_task(ti.task_id)
>             dttm = ti.execution_date
>             if task.sla:
>                 dttm = dag.following_schedule(dttm)
>   >>>           while dttm < datetime.now():          <<< here
>                     following_schedule = dag.following_schedule(dttm)
>                     if following_schedule + task.sla < datetime.now():
>                         session.merge(models.SlaMiss(
>                             task_id=ti.task_id,
> {code}
> It seems that dag.following_schedule() returns None for @once dag?
> Here's how dag is defined:
> {code}
> main_dag = DAG(
>     dag_id                         = 'DISCOVER-Oracle-Load',
>     default_args                   = default_args,           
>     user_defined_macros            = dag_macros,       
>     start_date                     = datetime.now(),         
>     catchup                        = False,                  
>     schedule_interval              = '@once',                
>     concurrency                    = 2,                      
>     max_active_runs                = 1,                      
>     dagrun_timeout                 = timedelta(days=4),      
> )
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message