airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1927] Convert naive datetimes for TaskInstances
Date Tue, 06 Feb 2018 16:26:14 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master c458a22cf -> 772dbae29


[AIRFLOW-1927] Convert naive datetimes for TaskInstances

TaskInstances are sometimes instantiated outside
core
Airflow with naive datetimes. In case this happens
we
now default to using the time zone of the DAG if
that
is available or the default system time zone.

Closes #2946 from bolkedebruin/AIRFLOW-1927


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/772dbae2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/772dbae2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/772dbae2

Branch: refs/heads/master
Commit: 772dbae298680feb9d521e7cd5526f4059d7cb69
Parents: c458a22
Author: Bolke de Bruin <bolke@xs4all.nl>
Authored: Tue Feb 6 17:26:08 2018 +0100
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Tue Feb 6 17:26:08 2018 +0100

----------------------------------------------------------------------
 airflow/bin/cli.py |  2 +-
 airflow/models.py  | 18 ++++++++++++++++--
 docs/timezone.rst  |  6 +++---
 setup.py           |  2 +-
 tests/jobs.py      | 25 +++++++++++++++++++++++++
 tests/models.py    | 23 +++++++++++++++++++++++
 6 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/772dbae2/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index d0c11d3..6bfcdcc 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -27,7 +27,7 @@ from importlib import import_module
 import argparse
 from builtins import input
 from collections import namedtuple
-from dateutil.parser import parse as parsedate
+from airflow.utils.timezone import parse as parsedate
 import json
 from tabulate import tabulate
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/772dbae2/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 9854213..78d1580 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -797,8 +797,23 @@ class TaskInstance(Base, LoggingMixin):
     def __init__(self, task, execution_date, state=None):
         self.dag_id = task.dag_id
         self.task_id = task.task_id
-        self.execution_date = execution_date
         self.task = task
+        self._log = logging.getLogger("airflow.task")
+
+        # make sure we have a localized execution_date stored in UTC
+        if execution_date and not timezone.is_localized(execution_date):
+            self.log.warning("execution date %s has no timezone information. Using "
+                             "default from dag or system", execution_date)
+            if self.task.has_dag():
+                execution_date = timezone.make_aware(execution_date,
+                                                     self.task.dag.timezone)
+            else:
+                execution_date = timezone.make_aware(execution_date)
+
+            execution_date = timezone.convert_to_utc(execution_date)
+
+        self.execution_date = execution_date
+
         self.queue = task.queue
         self.pool = task.pool
         self.priority_weight = task.priority_weight_total
@@ -810,7 +825,6 @@ class TaskInstance(Base, LoggingMixin):
             self.state = state
         self.hostname = ''
         self.init_on_load()
-        self._log = logging.getLogger("airflow.task")
         # Is this TaskInstance being currently running within `airflow run --raw`.
         # Not persisted to the database so only valid for the current process
         self.is_raw = False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/772dbae2/docs/timezone.rst
----------------------------------------------------------------------
diff --git a/docs/timezone.rst b/docs/timezone.rst
index ca30686..9e8598e 100644
--- a/docs/timezone.rst
+++ b/docs/timezone.rst
@@ -2,9 +2,9 @@ Time zones
 ==========
 
 Support for time zones is enabled by default. Airflow stores datetime information in UTC
internally and in the database.
- It allows you to run your DAGs with time zone dependent schedules. At the moment Airflow
does not them to the end 
- user’s time zone in the user interface. Also templates used in Operators are not translated.
Time zone information 
- is exposed and it is left up to the writer of DAG what do with it.
+It allows you to run your DAGs with time zone dependent schedules. At the moment Airflow
does not convert them to the 
+end user’s time zone in the user interface. There it will always be displayed in UTC. Also
templates used in Operators 
+are not converted. Time zone information is exposed and it is up to the writer of DAG what
do with it.
 
 This is handy if your users live in more than one time zone and you want to display datetime
information according to 
 each user’s wall clock.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/772dbae2/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 0b9c680..148e53d 100644
--- a/setup.py
+++ b/setup.py
@@ -221,7 +221,7 @@ def do_setup():
             'lxml>=3.6.0, <4.0',
             'markdown>=2.5.2, <3.0',
             'pandas>=0.17.1, <1.0.0',
-            'pendulum==1.3.2',
+            'pendulum==1.4.0',
             'psutil>=4.2.0, <5.0.0',
             'pygments>=2.0.1, <3.0',
             'python-daemon>=2.1.1, <2.2',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/772dbae2/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index aa78721..b2ca15e 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -333,6 +333,31 @@ class BackfillJobTest(unittest.TestCase):
         ti_dependent.refresh_from_db()
         self.assertEquals(ti_dependent.state, State.SUCCESS)
 
+    def test_run_naive_taskinstance(self):
+        """
+        Test that we can run naive (non-localized) task instances
+        """
+        NAIVE_DATE = datetime.datetime(2016, 1, 1)
+        dag_id = 'test_run_ignores_all_dependencies'
+
+        dag = self.dagbag.get_dag('test_run_ignores_all_dependencies')
+        dag.clear()
+
+        task0_id = 'test_run_dependent_task'
+        args0 = ['run',
+                 '-A',
+                 dag_id,
+                 task0_id,
+                 NAIVE_DATE.isoformat()]
+
+        cli.run(self.parser.parse_args(args0))
+        ti_dependent0 = TI(
+            task=dag.get_task(task0_id),
+            execution_date=NAIVE_DATE)
+
+        ti_dependent0.refresh_from_db()
+        self.assertEquals(ti_dependent0.state, State.FAILED)
+
     def test_cli_backfill_depends_on_past(self):
         """
         Test that CLI respects -I argument

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/772dbae2/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 17a9043..c89296e 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -898,6 +898,29 @@ class TaskInstanceTest(unittest.TestCase):
         self.assertTrue(
             op3.end_date == DEFAULT_DATE + datetime.timedelta(days=9))
 
+    def test_timezone_awareness(self):
+        NAIVE_DATETIME = DEFAULT_DATE.replace(tzinfo=None)
+
+        # check ti without dag (just for bw compat)
+        op_no_dag = DummyOperator(task_id='op_no_dag')
+        ti = TI(task=op_no_dag, execution_date=NAIVE_DATETIME)
+
+        self.assertEquals(ti.execution_date, DEFAULT_DATE)
+
+        # check with dag without localized execution_date
+        dag = DAG('dag', start_date=DEFAULT_DATE)
+        op1 = DummyOperator(task_id='op_1')
+        dag.add_task(op1)
+        ti = TI(task=op1, execution_date=NAIVE_DATETIME)
+
+        self.assertEquals(ti.execution_date, DEFAULT_DATE)
+
+        # with dag and localized execution_date
+        tz = pendulum.timezone("Europe/Amsterdam")
+        execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tz)
+        utc_date = timezone.convert_to_utc(execution_date)
+        ti = TI(task=op1, execution_date=execution_date)
+        self.assertEquals(ti.execution_date, utc_date)
 
     def test_set_dag(self):
         """


Mime
View raw message