Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 06712200B7D for ; Sat, 27 Aug 2016 00:08:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 04EAC160AC3; Fri, 26 Aug 2016 22:08:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F0D35160AB6 for ; Sat, 27 Aug 2016 00:08:07 +0200 (CEST) Received: (qmail 24323 invoked by uid 500); 26 Aug 2016 22:08:07 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 24313 invoked by uid 99); 26 Aug 2016 22:08:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Aug 2016 22:08:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id B9377C0439 for ; Fri, 26 Aug 2016 22:08:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id qo17MxkqoO1O for ; Fri, 26 Aug 2016 22:08:04 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 6C6BF5FCC7 for ; Fri, 26 Aug 2016 22:08:03 +0000 (UTC) Received: (qmail 24244 invoked by uid 99); 26 Aug 2016 22:08:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Aug 2016 22:08:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C3A16DFF57; Fri, 26 Aug 2016 22:08:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davydov@apache.org To: commits@airflow.incubator.apache.org Date: Fri, 26 Aug 2016 22:08:02 -0000 Message-Id: <33a3b81665034402afa8307aaf2ef9b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] incubator-airflow git commit: [AIRFLOW-149] Task Dependency Engine + Why Isn't My Task Running View archived-at: Fri, 26 Aug 2016 22:08:09 -0000 Repository: incubator-airflow Updated Branches: refs/heads/master 16fac9890 -> f36041477 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/runnable_exec_date_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/runnable_exec_date_dep.py b/tests/ti_deps/deps/runnable_exec_date_dep.py new file mode 100644 index 0000000..ae09ddb --- /dev/null +++ b/tests/ti_deps/deps/runnable_exec_date_dep.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from datetime import datetime +from mock import patch + +from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep +from fake_models import FakeDag, FakeTask, FakeTI +from tests.test_utils.fake_datetime import FakeDatetime + + +class RunnableExecDateDepTest(unittest.TestCase): + + @patch('airflow.ti_deps.deps.runnable_exec_date_dep.datetime', FakeDatetime) + def test_exec_date_after_end_date(self): + """ + If the dag's execution date is in the future this dep should fail + """ + FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 1)) + dag = FakeDag(end_date=datetime(2016, 1, 3)) + task = FakeTask(dag=dag, end_date=datetime(2016, 1, 3)) + ti = FakeTI(task=task, execution_date=datetime(2016, 1, 2)) + + self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None)) + + def test_exec_date_before_task_end_date(self): + """ + If the task instance execution date is before the DAG's end date this dep should + fail + """ + FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 3)) + dag = FakeDag(end_date=datetime(2016, 1, 1)) + task = FakeTask(dag=dag, end_date=datetime(2016, 1, 2)) + ti = FakeTI(task=task, execution_date=datetime(2016, 1, 1)) + + self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None)) + + def test_exec_date_after_task_end_date(self): + """ + If the task instance execution date is after the DAG's end date this dep should + fail + """ + FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 3)) + dag = FakeDag(end_date=datetime(2016, 1, 3)) + task = FakeTask(dag=dag, end_date=datetime(2016, 1, 1)) + ti = FakeTI(task=task, execution_date=datetime(2016, 1, 2)) + + self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None)) + + def test_exec_date_before_dag_end_date(self): + """ + If the task instance execution date is before the dag's end date this dep should + fail + """ + dag = FakeDag(start_date=datetime(2016, 1, 2)) + task = FakeTask(dag=dag, start_date=datetime(2016, 1, 1)) + ti = FakeTI(task=task, execution_date=datetime(2016, 1, 1)) + + self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None)) + + def test_exec_date_after_dag_end_date(self): + """ + If the task instance execution date is after the dag's end date this dep should + fail + """ + dag = FakeDag(end_date=datetime(2016, 1, 1)) + task = FakeTask(dag=dag, end_date=datetime(2016, 1, 3)) + ti = FakeTI(task=task, execution_date=datetime(2016, 1, 2)) + + self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None)) + + def test_all_deps_met(self): + """ + Test to make sure all of the conditions for the dep are met + """ + dag = FakeDag(end_date=datetime(2016, 1, 2)) + task = FakeTask(dag=dag, end_date=datetime(2016, 1, 2)) + ti = FakeTI(task=task, execution_date=datetime(2016, 1, 1)) + + self.assertTrue(RunnableExecDateDep().is_met(ti=ti, dep_context=None)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/trigger_rule_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/trigger_rule_dep.py b/tests/ti_deps/deps/trigger_rule_dep.py new file mode 100644 index 0000000..04a7737 --- /dev/null +++ b/tests/ti_deps/deps/trigger_rule_dep.py @@ -0,0 +1,295 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from airflow.utils.trigger_rule import TriggerRule +from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep +from airflow.utils.state import State +from fake_models import FakeTask, FakeTI + + +class TriggerRuleDepTest(unittest.TestCase): + + def test_no_upstream_tasks(self): + """ + If the TI has no upstream TIs then there is nothing to check and the dep is passed + """ + task = FakeTask( + trigger_rule=TriggerRule.ALL_DONE, + upstream_list=[]) + ti = FakeTI( + task=task, + state=State.UP_FOR_RETRY) + + self.assertTrue(TriggerRuleDep().is_met(ti=ti, dep_context=None)) + + def test_dummy_tr(self): + """ + The dummy trigger rule should always pass this dep + """ + task = FakeTask( + trigger_rule=TriggerRule.DUMMY, + upstream_list=[]) + ti = FakeTI( + task=task, + state=State.UP_FOR_RETRY) + + self.assertTrue(TriggerRuleDep().is_met(ti=ti, dep_context=None)) + + def test_one_success_tr_success(self): + """ + One-success trigger rule success + """ + task = FakeTask( + trigger_rule=TriggerRule.ONE_SUCCESS, + upstream_task_ids=[]) + ti = FakeTI(task=task) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=1, + skipped=2, + failed=2, + upstream_failed=2, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + + self.assertEqual(len(dep_statuses), 0) + + def test_one_success_tr_failure(self): + """ + One-success trigger rule failure + """ + task = FakeTask( + trigger_rule=TriggerRule.ONE_SUCCESS, + upstream_task_ids=[]) + ti = FakeTI(task=task) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=0, + skipped=2, + failed=2, + upstream_failed=2, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) + + def test_one_failure_tr_failure(self): + """ + One-failure trigger rule failure + """ + task = FakeTask( + trigger_rule=TriggerRule.ONE_FAILED, + upstream_task_ids=[]) + ti = FakeTI(task=task) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=2, + skipped=0, + failed=0, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + + def test_one_failure_tr_success(self): + """ + One-failure trigger rule success + """ + task = FakeTask( + trigger_rule=TriggerRule.ONE_FAILED, + upstream_task_ids=[]) + ti = FakeTI(task=task) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=0, + skipped=2, + failed=2, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + + self.assertEqual(len(dep_statuses), 0) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=0, + skipped=2, + failed=0, + upstream_failed=2, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + + self.assertEqual(len(dep_statuses), 0) + + def test_all_success_tr_success(self): + """ + All-success trigger rule success + """ + task = FakeTask( + trigger_rule=TriggerRule.ALL_SUCCESS, + upstream_task_ids=["FakeTaskID"]) + ti = FakeTI(task=task) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=1, + skipped=0, + failed=0, + upstream_failed=0, + done=1, + flag_upstream_failed=False, + session="Fake Session")) + + self.assertEqual(len(dep_statuses), 0) + + def test_all_success_tr_failure(self): + """ + All-success trigger rule failure + """ + task = FakeTask( + trigger_rule=TriggerRule.ALL_SUCCESS, + upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]) + ti = FakeTI(task=task) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=1, + skipped=0, + failed=1, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) + + def test_all_failed_tr_success(self): + """ + All-failed trigger rule success + """ + task = FakeTask( + trigger_rule=TriggerRule.ALL_FAILED, + upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]) + ti = FakeTI(task=task) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=0, + skipped=0, + failed=2, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + + self.assertEqual(len(dep_statuses), 0) + + def test_all_failed_tr_failure(self): + """ + All-failed trigger rule failure + """ + task = FakeTask( + trigger_rule=TriggerRule.ALL_FAILED, + upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]) + ti = FakeTI(task=task) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=2, + skipped=0, + failed=0, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) + + def test_all_done_tr_success(self): + """ + All-done trigger rule success + """ + task = FakeTask( + trigger_rule=TriggerRule.ALL_DONE, + upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]) + ti = FakeTI(task=task) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=2, + skipped=0, + failed=0, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + + self.assertEqual(len(dep_statuses), 0) + + def test_all_done_tr_failure(self): + """ + All-done trigger rule failure + """ + task = FakeTask( + trigger_rule=TriggerRule.ALL_DONE, + upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]) + ti = FakeTI(task=task) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=1, + skipped=0, + failed=0, + upstream_failed=0, + done=1, + flag_upstream_failed=False, + session="Fake Session")) + + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) + + def test_unknown_tr(self): + """ + Unknown trigger rules should cause this dep to fail + """ + task = FakeTask( + trigger_rule="Unknown Trigger Rule", + upstream_task_ids=[]) + ti = FakeTI(task=task) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=1, + skipped=0, + failed=0, + upstream_failed=0, + done=1, + flag_upstream_failed=False, + session="Fake Session")) + + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/valid_state_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/valid_state_dep.py b/tests/ti_deps/deps/valid_state_dep.py new file mode 100644 index 0000000..6bc0835 --- /dev/null +++ b/tests/ti_deps/deps/valid_state_dep.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from datetime import datetime + +from airflow import AirflowException +from airflow.ti_deps.deps.valid_state_dep import ValidStateDep +from airflow.utils.state import State +from fake_models import FakeTI + + +class ValidStateDepTest(unittest.TestCase): + + def test_valid_state(self): + """ + Valid state should pass this dep + """ + ti = FakeTI(state=State.QUEUED, end_date=datetime(2016, 1, 1)) + + self.assertTrue(ValidStateDep({State.QUEUED}).is_met(ti=ti, dep_context=None)) + + def test_invalid_state(self): + """ + Invalid state should fail this dep + """ + ti = FakeTI(state=State.SUCCESS, end_date=datetime(2016, 1, 1)) + + self.assertFalse(ValidStateDep({State.FAILURE}).is_met(ti=ti, dep_context=None)) + + def test_no_valid_states(self): + """ + If there are no valid states the dependency should throw + """ + ti = FakeTI(state=State.SUCCESS, end_date=datetime(2016, 1, 1)) + + with self.assertRaises(AirflowException): + ValidStateDep({}).is_met(ti=ti, dep_context=None)