airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] ksolan closed pull request #4257: Ksolan/pendulum 2.x
Date Thu, 29 Nov 2018 18:42:59 GMT
ksolan closed pull request #4257: Ksolan/pendulum 2.x
URL: https://github.com/apache/incubator-airflow/pull/4257
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/settings.py b/airflow/settings.py
index 8f8420ea22..dd0380f519 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -213,15 +213,15 @@ def dispose_orm():
 
 
 def configure_adapters():
-    from pendulum import Pendulum
+    from pendulum import datetime
     try:
         from sqlite3 import register_adapter
-        register_adapter(Pendulum, lambda val: val.isoformat(' '))
+        register_adapter(datetime, lambda val: val.isoformat(' '))
     except ImportError:
         pass
     try:
         import MySQLdb.converters
-        MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
+        MySQLdb.converters.conversions[datetime] = MySQLdb.converters.DateTime2literal
     except ImportError:
         pass
 
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 5adaa2f5c4..1edddb73ec 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -169,4 +169,4 @@ def parse(string, timezone=None):
     Parse a time string and return an aware datetime
     :param string: time string
     """
-    return pendulum.parse(string, tz=timezone or TIMEZONE)
+    return pendulum.parse(string, strict=False, tz=timezone or TIMEZONE)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 8792191e21..ec674d7379 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -315,7 +315,7 @@ def get_chart_height(dag):
 def get_date_time_num_runs_dag_runs_form_data(request, session, dag):
     dttm = request.args.get('execution_date')
     if dttm:
-        dttm = pendulum.parse(dttm)
+        dttm = pendulum.parse(dttm, strict=False)
     else:
         dttm = dag.latest_execution_date or timezone.utcnow()
 
@@ -729,7 +729,7 @@ def rendered(self):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dttm = pendulum.parse(execution_date)
+        dttm = pendulum.parse(execution_date, strict=False)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
         task = copy.copy(dag.get_task(task_id))
@@ -765,7 +765,7 @@ def get_logs_with_metadata(self, session=None):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dttm = pendulum.parse(execution_date)
+        dttm = pendulum.parse(execution_date, strict=False)
         try_number = int(request.args.get('try_number'))
         metadata = request.args.get('metadata')
         metadata = json.loads(metadata)
@@ -824,7 +824,7 @@ def log(self, session=None):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dttm = pendulum.parse(execution_date)
+        dttm = pendulum.parse(execution_date, strict=False)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
 
@@ -851,7 +851,7 @@ def task(self):
         # Carrying execution_date through, even though it's irrelevant for
         # this context
         execution_date = request.args.get('execution_date')
-        dttm = pendulum.parse(execution_date)
+        dttm = pendulum.parse(execution_date, strict=False)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
 
@@ -938,7 +938,7 @@ def xcom(self, session=None):
         # Carrying execution_date through, even though it's irrelevant for
         # this context
         execution_date = request.args.get('execution_date')
-        dttm = pendulum.parse(execution_date)
+        dttm = pendulum.parse(execution_date, strict=False)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
         if not dag or task_id not in dag.task_ids:
@@ -978,7 +978,7 @@ def run(self):
         task = dag.get_task(task_id)
 
         execution_date = request.args.get('execution_date')
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
         ignore_all_deps = request.args.get('ignore_all_deps') == "true"
         ignore_task_deps = request.args.get('ignore_task_deps') == "true"
         ignore_ti_state = request.args.get('ignore_ti_state') == "true"
@@ -1141,7 +1141,7 @@ def clear(self):
         dag = dagbag.get_dag(dag_id)
 
         execution_date = request.args.get('execution_date')
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
         confirmed = request.args.get('confirmed') == "true"
         upstream = request.args.get('upstream') == "true"
         downstream = request.args.get('downstream') == "true"
@@ -1171,7 +1171,7 @@ def dagrun_clear(self):
         confirmed = request.args.get('confirmed') == "true"
 
         dag = dagbag.get_dag(dag_id)
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
         start_date = execution_date
         end_date = execution_date
 
@@ -1206,7 +1206,7 @@ def _mark_dagrun_state_as_failed(self, dag_id, execution_date, confirmed,
origin
             flash('Invalid execution date', 'error')
             return redirect(origin)
 
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
         dag = dagbag.get_dag(dag_id)
 
         if not dag:
@@ -1234,7 +1234,7 @@ def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed,
origi
             flash('Invalid execution date', 'error')
             return redirect(origin)
 
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
         dag = dagbag.get_dag(dag_id)
 
         if not dag:
@@ -1289,7 +1289,7 @@ def _mark_task_instance_state(self, dag_id, task_id, origin, execution_date,
         task = dag.get_task(task_id)
         task.dag = dag
 
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
 
         if not dag:
             flash("Cannot find DAG: {}".format(dag_id))
@@ -1447,7 +1447,7 @@ def recurse_nodes(task, visited):
             def set_duration(tid):
                 if isinstance(tid, dict) and tid.get("state") == State.RUNNING \
                         and tid["start_date"] is not None:
-                    d = timezone.utcnow() - pendulum.parse(tid["start_date"])
+                    d = timezone.utcnow() - pendulum.parse(tid["start_date"], strict=False)
                     tid["duration"] = d.total_seconds()
                 return tid
 
@@ -1606,7 +1606,7 @@ def duration(self, session=None):
             return redirect('/admin/')
 
         if base_date:
-            base_date = pendulum.parse(base_date)
+            base_date = pendulum.parse(base_date, strict=False)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1714,7 +1714,7 @@ def tries(self, session=None):
         num_runs = int(num_runs) if num_runs else default_dag_run
 
         if base_date:
-            base_date = pendulum.parse(base_date)
+            base_date = pendulum.parse(base_date, strict=False)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1778,7 +1778,7 @@ def landing_times(self, session=None):
         num_runs = int(num_runs) if num_runs else default_dag_run
 
         if base_date:
-            base_date = pendulum.parse(base_date)
+            base_date = pendulum.parse(base_date, strict=False)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -2000,7 +2000,7 @@ def task_instances(self, session=None):
 
         dttm = request.args.get('execution_date')
         if dttm:
-            dttm = pendulum.parse(dttm)
+            dttm = pendulum.parse(dttm, strict=False)
         else:
             return "Error: Invalid execution_date"
 
@@ -2939,7 +2939,7 @@ def get_one(self, id):
         https://github.com/flask-admin/flask-admin/issues/1226
         """
         task_id, dag_id, execution_date = iterdecode(id)
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
         return self.session.query(self.model).get((task_id, dag_id, execution_date))
 
 
diff --git a/airflow/www_rbac/decorators.py b/airflow/www_rbac/decorators.py
index 8f962ef840..4215ac030d 100644
--- a/airflow/www_rbac/decorators.py
+++ b/airflow/www_rbac/decorators.py
@@ -47,7 +47,7 @@ def wrapper(*args, **kwargs):
 
         if 'execution_date' in request.args:
             log.execution_date = pendulum.parse(
-                request.args.get('execution_date'))
+                request.args.get('execution_date'), strict=False)
 
         session.add(log)
         session.commit()
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 49a9a734cc..643c099db5 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -81,7 +81,7 @@
 def get_date_time_num_runs_dag_runs_form_data(request, session, dag):
     dttm = request.args.get('execution_date')
     if dttm:
-        dttm = pendulum.parse(dttm)
+        dttm = pendulum.parse(dttm, strict=False)
     else:
         dttm = dag.latest_execution_date or timezone.utcnow()
 
@@ -460,7 +460,7 @@ def rendered(self):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dttm = pendulum.parse(execution_date)
+        dttm = pendulum.parse(execution_date, strict=False)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
         task = copy.copy(dag.get_task(task_id))
@@ -498,7 +498,7 @@ def get_logs_with_metadata(self, session=None):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dttm = pendulum.parse(execution_date)
+        dttm = pendulum.parse(execution_date, strict=False)
         try_number = int(request.args.get('try_number'))
         metadata = request.args.get('metadata')
         metadata = json.loads(metadata)
@@ -558,7 +558,7 @@ def log(self, session=None):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dttm = pendulum.parse(execution_date)
+        dttm = pendulum.parse(execution_date, strict=False)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
 
@@ -586,7 +586,7 @@ def task(self):
         # Carrying execution_date through, even though it's irrelevant for
         # this context
         execution_date = request.args.get('execution_date')
-        dttm = pendulum.parse(execution_date)
+        dttm = pendulum.parse(execution_date, strict=False)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
 
@@ -663,7 +663,7 @@ def xcom(self, session=None):
         # Carrying execution_date through, even though it's irrelevant for
         # this context
         execution_date = request.args.get('execution_date')
-        dttm = pendulum.parse(execution_date)
+        dttm = pendulum.parse(execution_date, strict=False)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
         if not dag or task_id not in dag.task_ids:
@@ -703,7 +703,7 @@ def run(self):
         task = dag.get_task(task_id)
 
         execution_date = request.args.get('execution_date')
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
         ignore_all_deps = request.args.get('ignore_all_deps') == "true"
         ignore_task_deps = request.args.get('ignore_task_deps') == "true"
         ignore_ti_state = request.args.get('ignore_ti_state') == "true"
@@ -868,7 +868,7 @@ def clear(self):
         dag = dagbag.get_dag(dag_id)
 
         execution_date = request.args.get('execution_date')
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
         confirmed = request.args.get('confirmed') == "true"
         upstream = request.args.get('upstream') == "true"
         downstream = request.args.get('downstream') == "true"
@@ -898,7 +898,7 @@ def dagrun_clear(self):
         confirmed = request.args.get('confirmed') == "true"
 
         dag = dagbag.get_dag(dag_id)
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
         start_date = execution_date
         end_date = execution_date
 
@@ -940,7 +940,7 @@ def _mark_dagrun_state_as_failed(self, dag_id, execution_date, confirmed,
origin
             flash('Invalid execution date', 'error')
             return redirect(origin)
 
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
         dag = dagbag.get_dag(dag_id)
 
         if not dag:
@@ -968,7 +968,7 @@ def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed,
origi
             flash('Invalid execution date', 'error')
             return redirect(origin)
 
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
         dag = dagbag.get_dag(dag_id)
 
         if not dag:
@@ -1023,7 +1023,7 @@ def _mark_task_instance_state(self, dag_id, task_id, origin, execution_date,
         task = dag.get_task(task_id)
         task.dag = dag
 
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
 
         if not dag:
             flash("Cannot find DAG: {}".format(dag_id))
@@ -1182,7 +1182,7 @@ def recurse_nodes(task, visited):
             def set_duration(tid):
                 if (isinstance(tid, dict) and tid.get("state") == State.RUNNING and
                         tid["start_date"] is not None):
-                    d = timezone.utcnow() - pendulum.parse(tid["start_date"])
+                    d = timezone.utcnow() - pendulum.parse(tid["start_date"], strict=False)
                     tid["duration"] = d.total_seconds()
                 return tid
 
@@ -1348,7 +1348,7 @@ def duration(self, session=None):
             return redirect('/')
 
         if base_date:
-            base_date = pendulum.parse(base_date)
+            base_date = pendulum.parse(base_date, strict=False)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1456,7 +1456,7 @@ def tries(self, session=None):
         num_runs = int(num_runs) if num_runs else default_dag_run
 
         if base_date:
-            base_date = pendulum.parse(base_date)
+            base_date = pendulum.parse(base_date, strict=False)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1521,7 +1521,7 @@ def landing_times(self, session=None):
         num_runs = int(num_runs) if num_runs else default_dag_run
 
         if base_date:
-            base_date = pendulum.parse(base_date)
+            base_date = pendulum.parse(base_date, strict=False)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1755,7 +1755,7 @@ def task_instances(self, session=None):
 
         dttm = request.args.get('execution_date')
         if dttm:
-            dttm = pendulum.parse(dttm)
+            dttm = pendulum.parse(dttm, strict=False)
         else:
             return "Error: Invalid execution_date"
 
@@ -2389,7 +2389,7 @@ def get_one(self, id):
         Flask-Admin side. https://github.com/flask-admin/flask-admin/issues/1226
         """
         task_id, dag_id, execution_date = iterdecode(id)  # noqa
-        execution_date = pendulum.parse(execution_date)
+        execution_date = pendulum.parse(execution_date, strict=False)
         return self.session.query(self.model).get((task_id, dag_id, execution_date))
 
 
diff --git a/setup.py b/setup.py
index aa80026fc9..db534db09d 100644
--- a/setup.py
+++ b/setup.py
@@ -315,7 +315,7 @@ def do_setup():
             'lxml>=4.0.0',
             'markdown>=2.5.2, <3.0',
             'pandas>=0.17.1, <1.0.0',
-            'pendulum==1.4.4',
+            'pendulum==2.0.3',
             'psutil>=4.2.0, <6.0.0',
             'pygments>=2.0.1, <3.0',
             'python-daemon>=2.1.1, <2.2',
diff --git a/tests/core.py b/tests/core.py
index b297edcbcd..185ab15322 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -24,6 +24,7 @@
 
 import bleach
 import doctest
+import pendulum
 import mock
 import multiprocessing
 import os
@@ -68,7 +69,6 @@
 from airflow.configuration import AirflowConfigException, run_command
 from jinja2.sandbox import SecurityError
 from jinja2 import UndefinedError
-from pendulum import utcnow
 
 import six
 
@@ -311,7 +311,8 @@ def test_schedule_dag_no_end_date_up_to_today_only(self):
         """
         session = settings.Session()
         delta = timedelta(days=1)
-        now = utcnow()
+        now = datetime.datetime.utcnow()\
+                               .replace(tzinfo=pendulum.timezone('UTC'))
         start_date = now.subtract(weeks=1)
 
         runs = (now - start_date).days
diff --git a/tests/ti_deps/deps/test_prev_dagrun_dep.py b/tests/ti_deps/deps/test_prev_dagrun_dep.py
index e5d8cdf2b1..b73df2e21c 100644
--- a/tests/ti_deps/deps/test_prev_dagrun_dep.py
+++ b/tests/ti_deps/deps/test_prev_dagrun_dep.py
@@ -21,12 +21,16 @@
 from datetime import datetime
 from mock import Mock
 
+import pendulum
 from airflow.models import DAG, BaseOperator
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
 from airflow.utils.state import State
 
 
+UTC = pendulum.timezone('UTC')
+
+
 class PrevDagrunDepTest(unittest.TestCase):
 
     def _get_task(self, **kwargs):
@@ -42,9 +46,9 @@ def test_not_depends_on_past(self):
                               wait_for_downstream=False)
         prev_ti = Mock(task=task, state=State.SUCCESS,
                        are_dependents_done=Mock(return_value=True),
-                       execution_date=datetime(2016, 1, 2))
+                       execution_date=datetime(2016, 1, 2, tzinfo=UTC))
         ti = Mock(task=task, previous_ti=prev_ti,
-                  execution_date=datetime(2016, 1, 3))
+                  execution_date=datetime(2016, 1, 3, tzinfo=UTC))
         dep_context = DepContext(ignore_depends_on_past=False)
 
         self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
@@ -59,9 +63,9 @@ def test_context_ignore_depends_on_past(self):
                               wait_for_downstream=False)
         prev_ti = Mock(task=task, state=State.SUCCESS,
                        are_dependents_done=Mock(return_value=True),
-                       execution_date=datetime(2016, 1, 2))
+                       execution_date=datetime(2016, 1, 2, tzinfo=UTC))
         ti = Mock(task=task, previous_ti=prev_ti,
-                  execution_date=datetime(2016, 1, 3))
+                  execution_date=datetime(2016, 1, 3, tzinfo=UTC))
         dep_context = DepContext(ignore_depends_on_past=True)
 
         self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
@@ -75,7 +79,7 @@ def test_first_task_run(self):
                               wait_for_downstream=False)
         prev_ti = None
         ti = Mock(task=task, previous_ti=prev_ti,
-                  execution_date=datetime(2016, 1, 1))
+                  execution_date=datetime(2016, 1, 1, tzinfo=UTC))
         dep_context = DepContext(ignore_depends_on_past=False)
 
         self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
@@ -90,7 +94,7 @@ def test_prev_ti_bad_state(self):
         prev_ti = Mock(state=State.NONE,
                        are_dependents_done=Mock(return_value=True))
         ti = Mock(task=task, previous_ti=prev_ti,
-                  execution_date=datetime(2016, 1, 2))
+                  execution_date=datetime(2016, 1, 2, tzinfo=UTC))
         dep_context = DepContext(ignore_depends_on_past=False)
 
         self.assertFalse(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
@@ -107,7 +111,7 @@ def test_failed_wait_for_downstream(self):
         prev_ti = Mock(state=State.SUCCESS,
                        are_dependents_done=Mock(return_value=False))
         ti = Mock(task=task, previous_ti=prev_ti,
-                  execution_date=datetime(2016, 1, 2))
+                  execution_date=datetime(2016, 1, 2, tzinfo=UTC))
         dep_context = DepContext(ignore_depends_on_past=False)
 
         self.assertFalse(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
@@ -122,7 +126,7 @@ def test_all_met(self):
         prev_ti = Mock(state=State.SUCCESS,
                        are_dependents_done=Mock(return_value=True))
         ti = Mock(task=task, previous_ti=prev_ti,
-                  execution_date=datetime(2016, 1, 2))
+                  execution_date=datetime(2016, 1, 2, tzinfo=UTC))
         dep_context = DepContext(ignore_depends_on_past=False)
 
         self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message