airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Maoya Sato (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AIRFLOW-1148) Airflow cannot handle datetime(6) column values(execution_time, start_date, end_date)
Date Wed, 26 Apr 2017 05:26:04 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-1148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Maoya Sato updated AIRFLOW-1148:
--------------------------------
    Description: 
Airflow cannot handle datetime(6) column values (execution_date, start_date, end_date etc..)
{code}
mysql> select dag_id, execution_date from dag_run;
+-------------+----------------------------+
| dag_id      | execution_date             |
+-------------+----------------------------+
| test_dag   | 2017-04-26 13:15:00.000000 |
+-------------+----------------------------+
{code}
{code}
>>> from airflow import settings
>>> session = settings.Session()
>>> from airflow.models import DagRun
>>> dag = session.query(DagRun).filter_by(dag_id='test_dag').first()
>>> dag.execution_date
>>>
{code}
execution_date gets None though it should be like datetime(2017, 4, 26, 13, 15)
The reason that I know is datetime(6) is the cause. if I try with datetime without  fractional
seconds precision, it works.
It has something to do with this migration(adding fsp to datetime column)
https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py

I've created a simple dag (python2)
{code}
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 4, 26, 13, 15),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'queue': 'airflow-dev',
    'end_date': datetime(2017, 4, 27, 0, 0)
}

dag = DAG(
    'test_dag',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(minutes=1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)
{code}

Error below occurs
{code}
{jobs.py:354} DagFileProcessor3 ERROR - Got an exception! Propagating...
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 346, in helper
    pickle_dags)
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 1583, in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 1173, in _process_dags
    dag_run = self.create_dag_run(dag)
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 803, in create_dag_run
    while next_run_date <= last_run.execution_date:
TypeError: can't compare datetime.datetime to NoneType
{code}

  was:
Airflow cannot handle datetime(6) column values (execution_date, start_date, end_date etc..)
{code}
mysql> select dag_id, execution_date from dag_run;
+-------------+----------------------------+
| dag_id      | execution_date             |
+-------------+----------------------------+
| test_dag   | 2017-04-26 13:15:00.000000 |
+-------------+----------------------------+
{code}
{code}
>>> from airflow import settings
>>> session = settings.Session()
>>> from airflow.models import DagRun
>>> dag = session.query(DagRun).filter_by(dag_id='test_dag').first()
>>> dag.execution_date
>>>
{code}
execution_date gets None though it should be like datetime(2017, 4, 26, 13, 15)
The reason that I know is datetime(6) is the cause. if I try with datetime without  fractional
seconds precision, it works.
It has something to do with this migration(adding fsp to datetime column)
https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py

I've created a simple dag (python2)
{code:python}
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 4, 26, 13, 15),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'queue': 'airflow-dev',
    'end_date': datetime(2017, 4, 27, 0, 0)
}

dag = DAG(
    'test_dag',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(minutes=1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)
{code}

Error below occurs
{code}
{jobs.py:354} DagFileProcessor3 ERROR - Got an exception! Propagating...
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 346, in helper
    pickle_dags)
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 1583, in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 1173, in _process_dags
    dag_run = self.create_dag_run(dag)
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 803, in create_dag_run
    while next_run_date <= last_run.execution_date:
TypeError: can't compare datetime.datetime to NoneType
{code}


> Airflow cannot handle datetime(6) column values(execution_time, start_date, end_date)
> -------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-1148
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1148
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DagRun
>    Affects Versions: 1.8.0
>         Environment: sql_alchemy_conn: cloudSQL via cloud_sql_proxy
> celery broker: amazon SQS
>            Reporter: Maoya Sato
>
> Airflow cannot handle datetime(6) column values (execution_date, start_date, end_date
etc..)
> {code}
> mysql> select dag_id, execution_date from dag_run;
> +-------------+----------------------------+
> | dag_id      | execution_date             |
> +-------------+----------------------------+
> | test_dag   | 2017-04-26 13:15:00.000000 |
> +-------------+----------------------------+
> {code}
> {code}
> >>> from airflow import settings
> >>> session = settings.Session()
> >>> from airflow.models import DagRun
> >>> dag = session.query(DagRun).filter_by(dag_id='test_dag').first()
> >>> dag.execution_date
> >>>
> {code}
> execution_date gets None though it should be like datetime(2017, 4, 26, 13, 15)
> The reason that I know is datetime(6) is the cause. if I try with datetime without  fractional
seconds precision, it works.
> It has something to do with this migration(adding fsp to datetime column)
> https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
> I've created a simple dag (python2)
> {code}
> import airflow
> from airflow import DAG
> from airflow.operators.bash_operator import BashOperator
> from datetime import timedelta, datetime
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2017, 4, 26, 13, 15),
>     'retries': 1,
>     'retry_delay': timedelta(minutes=5),
>     'queue': 'airflow-dev',
>     'end_date': datetime(2017, 4, 27, 0, 0)
> }
> dag = DAG(
>     'test_dag',
>     default_args=default_args,
>     description='A simple tutorial DAG',
>     schedule_interval=timedelta(minutes=1))
> t1 = BashOperator(
>     task_id='print_date',
>     bash_command='date',
>     dag=dag)
> {code}
> Error below occurs
> {code}
> {jobs.py:354} DagFileProcessor3 ERROR - Got an exception! Propagating...
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 346, in helper
>     pickle_dags)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 1583, in process_file
>     self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 1173, in _process_dags
>     dag_run = self.create_dag_run(dag)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 803, in create_dag_run
>     while next_run_date <= last_run.execution_date:
> TypeError: can't compare datetime.datetime to NoneType
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message