airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-856] Make sure execution date is set for local client
Date Fri, 10 Feb 2017 13:17:51 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test adaebc2d7 -> fb88c2d83


[AIRFLOW-856] Make sure execution date is set for local client

In the local api client the execution date was
hardi coded to None.
Secondly, when no execution date was specified the
execution date
was set to datetime.now(). Datetime.now() includes
the fractional seconds
that are supported in the database, but they are
not supported in
a.o. the current logging setup. Now we cut off
fractional seconds for
the execution date.

Closes #2064 from bolkedebruin/AIRFLOW-856

(cherry picked from commit b7c828bf094d3aa1eae310979a82addf7e423bb0)
Signed-off-by: Bolke de Bruin <bolke@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fb88c2d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fb88c2d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fb88c2d8

Branch: refs/heads/v1-8-test
Commit: fb88c2d8362d751f902252c51c8bce4301ac8c40
Parents: adaebc2
Author: Bolke de Bruin <bolke@xs4all.nl>
Authored: Fri Feb 10 14:17:26 2017 +0100
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Fri Feb 10 14:17:44 2017 +0100

----------------------------------------------------------------------
 airflow/api/client/local_client.py             |   2 +-
 airflow/api/common/experimental/trigger_dag.py |   9 +-
 tests/__init__.py                              |   1 +
 tests/api/__init__.py                          |  17 ++++
 tests/api/client/__init__.py                   |  13 +++
 tests/api/client/local_client.py               | 107 ++++++++++++++++++++
 6 files changed, 144 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb88c2d8/airflow/api/client/local_client.py
----------------------------------------------------------------------
diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py
index a4d1f93..05f27f6 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -21,5 +21,5 @@ class Client(api_client.Client):
         dr = trigger_dag.trigger_dag(dag_id=dag_id,
                                      run_id=run_id,
                                      conf=conf,
-                                     execution_date=None)
+                                     execution_date=execution_date)
         return "Created {}".format(dr)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb88c2d8/airflow/api/common/experimental/trigger_dag.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index 0905017..2c5a462 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -12,15 +12,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from datetime import datetime
+import datetime
 import json
 
 from airflow.exceptions import AirflowException
 from airflow.models import DagRun, DagBag
 from airflow.utils.state import State
 
-import logging
-
 
 def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
     dagbag = DagBag()
@@ -31,7 +29,10 @@ def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
     dag = dagbag.get_dag(dag_id)
 
     if not execution_date:
-        execution_date = datetime.now()
+        execution_date = datetime.datetime.now()
+
+    assert isinstance(execution_date, datetime.datetime)
+    execution_date = execution_date.replace(microsecond=0)
 
     if not run_id:
         run_id = "manual__{0}".format(execution_date.isoformat())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb88c2d8/tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/__init__.py b/tests/__init__.py
index e1e8551..7ddf22d 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -14,6 +14,7 @@
 
 from __future__ import absolute_import
 
+from .api import *
 from .configuration import *
 from .contrib import *
 from .core import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb88c2d8/tests/api/__init__.py
----------------------------------------------------------------------
diff --git a/tests/api/__init__.py b/tests/api/__init__.py
new file mode 100644
index 0000000..2db97ad
--- /dev/null
+++ b/tests/api/__init__.py
@@ -0,0 +1,17 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from .client import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb88c2d8/tests/api/client/__init__.py
----------------------------------------------------------------------
diff --git a/tests/api/client/__init__.py b/tests/api/client/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/api/client/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb88c2d8/tests/api/client/local_client.py
----------------------------------------------------------------------
diff --git a/tests/api/client/local_client.py b/tests/api/client/local_client.py
new file mode 100644
index 0000000..a36b71f
--- /dev/null
+++ b/tests/api/client/local_client.py
@@ -0,0 +1,107 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import unittest
+import datetime
+
+from mock import patch
+
+from airflow import AirflowException
+from airflow import models
+
+from airflow.api.client.local_client import Client
+from airflow.utils.state import State
+
+EXECDATE = datetime.datetime.now()
+EXECDATE_NOFRACTIONS = EXECDATE.replace(microsecond=0)
+EXECDATE_ISO = EXECDATE_NOFRACTIONS.isoformat()
+
+real_datetime_class = datetime.datetime
+
+
+def mock_datetime_now(target, dt):
+    class DatetimeSubclassMeta(type):
+        @classmethod
+        def __instancecheck__(mcs, obj):
+            return isinstance(obj, real_datetime_class)
+
+    class BaseMockedDatetime(real_datetime_class):
+        @classmethod
+        def now(cls, tz=None):
+            return target.replace(tzinfo=tz)
+
+        @classmethod
+        def utcnow(cls):
+            return target
+
+    # Python2 & Python3 compatible metaclass
+    MockedDatetime = DatetimeSubclassMeta('datetime', (BaseMockedDatetime,), {})
+
+    return patch.object(dt, 'datetime', MockedDatetime)
+
+
+class TestLocalClient(unittest.TestCase):
+    def setUp(self):
+        self.client = Client(api_base_url=None, auth=None)
+
+    @patch.object(models.DAG, 'create_dagrun')
+    def test_trigger_dag(self, mock):
+        client = self.client
+
+        # non existent
+        with self.assertRaises(AirflowException):
+            client.trigger_dag(dag_id="blablabla")
+
+        import airflow.api.common.experimental.trigger_dag
+        with mock_datetime_now(EXECDATE, airflow.api.common.experimental.trigger_dag.datetime):
+            # no execution date, execution date should be set automatically
+            client.trigger_dag(dag_id="test_start_date_scheduling")
+            mock.assert_called_once_with(run_id="manual__{0}".format(EXECDATE_ISO),
+                                         execution_date=EXECDATE_NOFRACTIONS,
+                                         state=State.RUNNING,
+                                         conf=None,
+                                         external_trigger=True)
+            mock.reset_mock()
+
+            # execution date with microseconds cutoff
+            client.trigger_dag(dag_id="test_start_date_scheduling", execution_date=EXECDATE)
+            mock.assert_called_once_with(run_id="manual__{0}".format(EXECDATE_ISO),
+                                         execution_date=EXECDATE_NOFRACTIONS,
+                                         state=State.RUNNING,
+                                         conf=None,
+                                         external_trigger=True)
+            mock.reset_mock()
+
+            # run id
+            run_id = "my_run_id"
+            client.trigger_dag(dag_id="test_start_date_scheduling", run_id=run_id)
+            mock.assert_called_once_with(run_id=run_id,
+                                         execution_date=EXECDATE_NOFRACTIONS,
+                                         state=State.RUNNING,
+                                         conf=None,
+                                         external_trigger=True)
+            mock.reset_mock()
+
+            # test conf
+            conf = '{"name": "John"}'
+            client.trigger_dag(dag_id="test_start_date_scheduling", conf=conf)
+            mock.assert_called_once_with(run_id="manual__{0}".format(EXECDATE_ISO),
+                                         execution_date=EXECDATE_NOFRACTIONS,
+                                         state=State.RUNNING,
+                                         conf=json.loads(conf),
+                                         external_trigger=True)
+            mock.reset_mock()
+
+            # this is a unit test only, cannot verify existing dag run


Mime
View raw message