airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davy...@apache.org
Subject [1/3] incubator-airflow git commit: [AIRFLOW-149] Task Dependency Engine + Why Isn't My Task Running View
Date Fri, 26 Aug 2016 22:08:02 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 16fac9890 -> f36041477


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/runnable_exec_date_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/runnable_exec_date_dep.py b/tests/ti_deps/deps/runnable_exec_date_dep.py
new file mode 100644
index 0000000..ae09ddb
--- /dev/null
+++ b/tests/ti_deps/deps/runnable_exec_date_dep.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime
+from mock import patch
+
+from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep
+from fake_models import FakeDag, FakeTask, FakeTI
+from tests.test_utils.fake_datetime import FakeDatetime
+
+
+class RunnableExecDateDepTest(unittest.TestCase):
+
+    @patch('airflow.ti_deps.deps.runnable_exec_date_dep.datetime', FakeDatetime)
+    def test_exec_date_after_end_date(self):
+        """
+        If the dag's execution date is in the future this dep should fail
+        """
+        FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 1))
+        dag = FakeDag(end_date=datetime(2016, 1, 3))
+        task = FakeTask(dag=dag, end_date=datetime(2016, 1, 3))
+        ti = FakeTI(task=task, execution_date=datetime(2016, 1, 2))
+
+        self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None))
+
+    def test_exec_date_before_task_end_date(self):
+        """
+        If the task instance execution date is before the DAG's end date this dep should
+        fail
+        """
+        FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 3))
+        dag = FakeDag(end_date=datetime(2016, 1, 1))
+        task = FakeTask(dag=dag, end_date=datetime(2016, 1, 2))
+        ti = FakeTI(task=task, execution_date=datetime(2016, 1, 1))
+
+        self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None))
+
+    def test_exec_date_after_task_end_date(self):
+        """
+        If the task instance execution date is after the DAG's end date this dep should
+        fail
+        """
+        FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 3))
+        dag = FakeDag(end_date=datetime(2016, 1, 3))
+        task = FakeTask(dag=dag, end_date=datetime(2016, 1, 1))
+        ti = FakeTI(task=task, execution_date=datetime(2016, 1, 2))
+
+        self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None))
+
+    def test_exec_date_before_dag_end_date(self):
+        """
+        If the task instance execution date is before the dag's end date this dep should
+        fail
+        """
+        dag = FakeDag(start_date=datetime(2016, 1, 2))
+        task = FakeTask(dag=dag, start_date=datetime(2016, 1, 1))
+        ti = FakeTI(task=task, execution_date=datetime(2016, 1, 1))
+
+        self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None))
+
+    def test_exec_date_after_dag_end_date(self):
+        """
+        If the task instance execution date is after the dag's end date this dep should
+        fail
+        """
+        dag = FakeDag(end_date=datetime(2016, 1, 1))
+        task = FakeTask(dag=dag, end_date=datetime(2016, 1, 3))
+        ti = FakeTI(task=task, execution_date=datetime(2016, 1, 2))
+
+        self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None))
+
+    def test_all_deps_met(self):
+        """
+        Test to make sure all of the conditions for the dep are met
+        """
+        dag = FakeDag(end_date=datetime(2016, 1, 2))
+        task = FakeTask(dag=dag, end_date=datetime(2016, 1, 2))
+        ti = FakeTI(task=task, execution_date=datetime(2016, 1, 1))
+
+        self.assertTrue(RunnableExecDateDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/trigger_rule_dep.py b/tests/ti_deps/deps/trigger_rule_dep.py
new file mode 100644
index 0000000..04a7737
--- /dev/null
+++ b/tests/ti_deps/deps/trigger_rule_dep.py
@@ -0,0 +1,295 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.utils.trigger_rule import TriggerRule
+from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
+from airflow.utils.state import State
+from fake_models import FakeTask, FakeTI
+
+
+class TriggerRuleDepTest(unittest.TestCase):
+
+    def test_no_upstream_tasks(self):
+        """
+        If the TI has no upstream TIs then there is nothing to check and the dep is passed
+        """
+        task = FakeTask(
+            trigger_rule=TriggerRule.ALL_DONE,
+            upstream_list=[])
+        ti = FakeTI(
+            task=task,
+            state=State.UP_FOR_RETRY)
+
+        self.assertTrue(TriggerRuleDep().is_met(ti=ti, dep_context=None))
+
+    def test_dummy_tr(self):
+        """
+        The dummy trigger rule should always pass this dep
+        """
+        task = FakeTask(
+            trigger_rule=TriggerRule.DUMMY,
+            upstream_list=[])
+        ti = FakeTI(
+            task=task,
+            state=State.UP_FOR_RETRY)
+
+        self.assertTrue(TriggerRuleDep().is_met(ti=ti, dep_context=None))
+
+    def test_one_success_tr_success(self):
+        """
+        One-success trigger rule success
+        """
+        task = FakeTask(
+            trigger_rule=TriggerRule.ONE_SUCCESS,
+            upstream_task_ids=[])
+        ti = FakeTI(task=task)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=1,
+            skipped=2,
+            failed=2,
+            upstream_failed=2,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+        self.assertEqual(len(dep_statuses), 0)
+
+    def test_one_success_tr_failure(self):
+        """
+        One-success trigger rule failure
+        """
+        task = FakeTask(
+            trigger_rule=TriggerRule.ONE_SUCCESS,
+            upstream_task_ids=[])
+        ti = FakeTI(task=task)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=0,
+            skipped=2,
+            failed=2,
+            upstream_failed=2,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)
+
+    def test_one_failure_tr_failure(self):
+        """
+        One-failure trigger rule failure
+        """
+        task = FakeTask(
+            trigger_rule=TriggerRule.ONE_FAILED,
+            upstream_task_ids=[])
+        ti = FakeTI(task=task)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=2,
+            skipped=0,
+            failed=0,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+    def test_one_failure_tr_success(self):
+        """
+        One-failure trigger rule success
+        """
+        task = FakeTask(
+            trigger_rule=TriggerRule.ONE_FAILED,
+            upstream_task_ids=[])
+        ti = FakeTI(task=task)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=0,
+            skipped=2,
+            failed=2,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+        self.assertEqual(len(dep_statuses), 0)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=0,
+            skipped=2,
+            failed=0,
+            upstream_failed=2,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+        self.assertEqual(len(dep_statuses), 0)
+
+    def test_all_success_tr_success(self):
+        """
+        All-success trigger rule success
+        """
+        task = FakeTask(
+            trigger_rule=TriggerRule.ALL_SUCCESS,
+            upstream_task_ids=["FakeTaskID"])
+        ti = FakeTI(task=task)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=1,
+            skipped=0,
+            failed=0,
+            upstream_failed=0,
+            done=1,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+        self.assertEqual(len(dep_statuses), 0)
+
+    def test_all_success_tr_failure(self):
+        """
+        All-success trigger rule failure
+        """
+        task = FakeTask(
+            trigger_rule=TriggerRule.ALL_SUCCESS,
+            upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"])
+        ti = FakeTI(task=task)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=1,
+            skipped=0,
+            failed=1,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)
+
+    def test_all_failed_tr_success(self):
+        """
+        All-failed trigger rule success
+        """
+        task = FakeTask(
+            trigger_rule=TriggerRule.ALL_FAILED,
+            upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"])
+        ti = FakeTI(task=task)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=0,
+            skipped=0,
+            failed=2,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+        self.assertEqual(len(dep_statuses), 0)
+
+    def test_all_failed_tr_failure(self):
+        """
+        All-failed trigger rule failure
+        """
+        task = FakeTask(
+            trigger_rule=TriggerRule.ALL_FAILED,
+            upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"])
+        ti = FakeTI(task=task)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=2,
+            skipped=0,
+            failed=0,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)
+
+    def test_all_done_tr_success(self):
+        """
+        All-done trigger rule success
+        """
+        task = FakeTask(
+            trigger_rule=TriggerRule.ALL_DONE,
+            upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"])
+        ti = FakeTI(task=task)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=2,
+            skipped=0,
+            failed=0,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+        self.assertEqual(len(dep_statuses), 0)
+
+    def test_all_done_tr_failure(self):
+        """
+        All-done trigger rule failure
+        """
+        task = FakeTask(
+            trigger_rule=TriggerRule.ALL_DONE,
+            upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"])
+        ti = FakeTI(task=task)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=1,
+            skipped=0,
+            failed=0,
+            upstream_failed=0,
+            done=1,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)
+
+    def test_unknown_tr(self):
+        """
+        Unknown trigger rules should cause this dep to fail
+        """
+        task = FakeTask(
+            trigger_rule="Unknown Trigger Rule",
+            upstream_task_ids=[])
+        ti = FakeTI(task=task)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=1,
+            skipped=0,
+            failed=0,
+            upstream_failed=0,
+            done=1,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/valid_state_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/valid_state_dep.py b/tests/ti_deps/deps/valid_state_dep.py
new file mode 100644
index 0000000..6bc0835
--- /dev/null
+++ b/tests/ti_deps/deps/valid_state_dep.py
@@ -0,0 +1,49 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime
+
+from airflow import AirflowException
+from airflow.ti_deps.deps.valid_state_dep import ValidStateDep
+from airflow.utils.state import State
+from fake_models import FakeTI
+
+
+class ValidStateDepTest(unittest.TestCase):
+
+    def test_valid_state(self):
+        """
+        Valid state should pass this dep
+        """
+        ti = FakeTI(state=State.QUEUED, end_date=datetime(2016, 1, 1))
+
+        self.assertTrue(ValidStateDep({State.QUEUED}).is_met(ti=ti, dep_context=None))
+
+    def test_invalid_state(self):
+        """
+        Invalid state should fail this dep
+        """
+        ti = FakeTI(state=State.SUCCESS, end_date=datetime(2016, 1, 1))
+
+        self.assertFalse(ValidStateDep({State.FAILURE}).is_met(ti=ti, dep_context=None))
+
+    def test_no_valid_states(self):
+        """
+        If there are no valid states the dependency should throw
+        """
+        ti = FakeTI(state=State.SUCCESS, end_date=datetime(2016, 1, 1))
+
+        with self.assertRaises(AirflowException):
+            ValidStateDep({}).is_met(ti=ti, dep_context=None)


Mime
View raw message