Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 63E12200C6F for ; Tue, 9 May 2017 19:35:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 62717160BCD; Tue, 9 May 2017 17:35:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 14CF1160BC3 for ; Tue, 9 May 2017 19:35:40 +0200 (CEST) Received: (qmail 58648 invoked by uid 500); 9 May 2017 17:35:40 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 58638 invoked by uid 99); 9 May 2017 17:35:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 May 2017 17:35:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id B723EC689F for ; Tue, 9 May 2017 17:35:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id kHSBpNM1ik35 for ; Tue, 9 May 2017 17:35:36 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id CFABE5FE51 for ; Tue, 9 May 2017 17:35:31 +0000 (UTC) Received: (qmail 54253 invoked by uid 99); 9 May 2017 17:35:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 May 2017 17:35:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B7E95E0FE7; Tue, 9 May 2017 17:35:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: criccomini@apache.org To: commits@airflow.incubator.apache.org Date: Tue, 09 May 2017 17:35:43 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/36] incubator-airflow git commit: [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags archived-at: Tue, 09 May 2017 17:35:43 -0000 [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags DAGs that did not have a schedule (None or @once) make the dependency checker raise an exception as the previous schedule will not exist. Also activates all ti_deps tests. Closes #2220 from bolkedebruin/AIRFLOW-1033 (cherry picked from commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab) Signed-off-by: Bolke de Bruin Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ebfc3ea7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ebfc3ea7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ebfc3ea7 Branch: refs/heads/v1-8-stable Commit: ebfc3ea73ae1ffe273e4ff532f1ad47441bef518 Parents: 9167411 Author: Bolke de Bruin Authored: Thu Apr 6 14:03:11 2017 +0200 Committer: Bolke de Bruin Committed: Thu Apr 6 14:03:24 2017 +0200 ---------------------------------------------------------------------- airflow/ti_deps/deps/base_ti_dep.py | 14 +- airflow/ti_deps/deps/prev_dagrun_dep.py | 5 + .../ti_deps/deps/dag_ti_slots_available_dep.py | 41 --- tests/ti_deps/deps/dag_unpaused_dep.py | 41 --- tests/ti_deps/deps/dagrun_exists_dep.py | 41 --- tests/ti_deps/deps/not_in_retry_period_dep.py | 61 ---- tests/ti_deps/deps/not_running_dep.py | 39 --- tests/ti_deps/deps/not_skipped_dep.py | 38 --- tests/ti_deps/deps/pool_has_space_dep.py | 37 --- tests/ti_deps/deps/prev_dagrun_dep.py | 143 --------- tests/ti_deps/deps/runnable_exec_date_dep.py | 92 ------ .../deps/test_dag_ti_slots_available_dep.py | 42 +++ tests/ti_deps/deps/test_dag_unpaused_dep.py | 42 +++ tests/ti_deps/deps/test_dagrun_exists_dep.py | 40 +++ .../deps/test_not_in_retry_period_dep.py | 59 ++++ tests/ti_deps/deps/test_not_running_dep.py | 37 +++ tests/ti_deps/deps/test_not_skipped_dep.py | 36 +++ tests/ti_deps/deps/test_prev_dagrun_dep.py | 123 ++++++++ .../ti_deps/deps/test_runnable_exec_date_dep.py | 76 +++++ tests/ti_deps/deps/test_trigger_rule_dep.py | 252 ++++++++++++++++ tests/ti_deps/deps/test_valid_state_dep.py | 46 +++ tests/ti_deps/deps/trigger_rule_dep.py | 295 ------------------- tests/ti_deps/deps/valid_state_dep.py | 49 --- 23 files changed, 768 insertions(+), 881 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/airflow/ti_deps/deps/base_ti_dep.py ---------------------------------------------------------------------- diff --git a/airflow/ti_deps/deps/base_ti_dep.py b/airflow/ti_deps/deps/base_ti_dep.py index 0188043..bad1fa0 100644 --- a/airflow/ti_deps/deps/base_ti_dep.py +++ b/airflow/ti_deps/deps/base_ti_dep.py @@ -51,7 +51,7 @@ class BaseTIDep(object): """ return getattr(self, 'NAME', self.__class__.__name__) - def _get_dep_statuses(self, ti, session, dep_context): + def _get_dep_statuses(self, ti, session, dep_context=None): """ Abstract method that returns an iterable of TIDepStatus objects that describe whether the given task instance has this dependency met. @@ -69,7 +69,7 @@ class BaseTIDep(object): raise NotImplementedError @provide_session - def get_dep_statuses(self, ti, session, dep_context): + def get_dep_statuses(self, ti, session, dep_context=None): """ Wrapper around the private _get_dep_statuses method that contains some global checks for all dependencies. @@ -81,6 +81,12 @@ class BaseTIDep(object): :param dep_context: the context for which this dependency should be evaluated for :type dep_context: DepContext """ + # this avoids a circular dependency + from airflow.ti_deps.dep_context import DepContext + + if dep_context is None: + dep_context = DepContext() + if self.IGNOREABLE and dep_context.ignore_all_deps: yield self._passing_status( reason="Context specified all dependencies should be ignored.") @@ -95,7 +101,7 @@ class BaseTIDep(object): yield dep_status @provide_session - def is_met(self, ti, session, dep_context): + def is_met(self, ti, session, dep_context=None): """ Returns whether or not this dependency is met for a given task instance. A dependency is considered met if all of the dependency statuses it reports are @@ -113,7 +119,7 @@ class BaseTIDep(object): self.get_dep_statuses(ti, session, dep_context)) @provide_session - def get_failure_reasons(self, ti, session, dep_context): + def get_failure_reasons(self, ti, session, dep_context=None): """ Returns an iterable of strings that explain why this dependency wasn't met. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/airflow/ti_deps/deps/prev_dagrun_dep.py ---------------------------------------------------------------------- diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py b/airflow/ti_deps/deps/prev_dagrun_dep.py index 7d4baa8..5455fb5 100644 --- a/airflow/ti_deps/deps/prev_dagrun_dep.py +++ b/airflow/ti_deps/deps/prev_dagrun_dep.py @@ -41,6 +41,11 @@ class PrevDagrunDep(BaseTIDep): # Don't depend on the previous task instance if we are the first task dag = ti.task.dag if dag.catchup: + if dag.previous_schedule(ti.execution_date) is None: + yield self._passing_status( + reason="This task does not have a schedule or is @once" + ) + return if dag.previous_schedule(ti.execution_date) < ti.task.start_date: yield self._passing_status( reason="This task instance was the first task instance for its task.") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/dag_ti_slots_available_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/dag_ti_slots_available_dep.py b/tests/ti_deps/deps/dag_ti_slots_available_dep.py deleted file mode 100644 index 6077d96..0000000 --- a/tests/ti_deps/deps/dag_ti_slots_available_dep.py +++ /dev/null @@ -1,41 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from airflow.ti_deps.deps.dag_ti_slots_available_dep import DagTISlotsAvailableDep -from fake_models import FakeDag, FakeTask, FakeTI - - -class DagTISlotsAvailableDepTest(unittest.TestCase): - - def test_concurrency_reached(self): - """ - Test concurrency reached should fail dep - """ - dag = FakeDag(concurrency=1, concurrency_reached=True) - task = FakeTask(dag=dag) - ti = FakeTI(task=task, dag_id="fake_dag") - - self.assertFalse(DagTISlotsAvailableDep().is_met(ti=ti, dep_context=None)) - - def test_all_conditions_met(self): - """ - Test all conditions met should pass dep - """ - dag = FakeDag(concurrency=1, concurrency_reached=False) - task = FakeTask(dag=dag) - ti = FakeTI(task=task, dag_id="fake_dag") - - self.assertTrue(DagTISlotsAvailableDep().is_met(ti=ti, dep_context=None)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/dag_unpaused_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/dag_unpaused_dep.py b/tests/ti_deps/deps/dag_unpaused_dep.py deleted file mode 100644 index 8721a51..0000000 --- a/tests/ti_deps/deps/dag_unpaused_dep.py +++ /dev/null @@ -1,41 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep -from fake_models import FakeDag, FakeTask, FakeTI - - -class DagUnpausedDepTest(unittest.TestCase): - - def test_concurrency_reached(self): - """ - Test paused DAG should fail dependency - """ - dag = FakeDag(is_paused=True) - task = FakeTask(dag=dag) - ti = FakeTI(task=task, dag_id="fake_dag") - - self.assertFalse(DagUnpausedDep().is_met(ti=ti, dep_context=None)) - - def test_all_conditions_met(self): - """ - Test all conditions met should pass dep - """ - dag = FakeDag(is_paused=False) - task = FakeTask(dag=dag) - ti = FakeTI(task=task, dag_id="fake_dag") - - self.assertTrue(DagUnpausedDep().is_met(ti=ti, dep_context=None)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/dagrun_exists_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/dagrun_exists_dep.py b/tests/ti_deps/deps/dagrun_exists_dep.py deleted file mode 100644 index 1141647..0000000 --- a/tests/ti_deps/deps/dagrun_exists_dep.py +++ /dev/null @@ -1,41 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep -from fake_models import FakeDag, FakeTask, FakeTI - - -class DagrunRunningDepTest(unittest.TestCase): - - def test_dagrun_doesnt_exist(self): - """ - Task instances without dagruns should fail this dep - """ - dag = FakeDag(running_dagruns=[], max_active_runs=1) - task = FakeTask(dag=dag) - ti = FakeTI(dagrun=None, task=task, dag_id="fake_dag") - - self.assertFalse(DagrunRunningDep().is_met(ti=ti, dep_context=None)) - - def test_dagrun_exists(self): - """ - Task instances with a dagrun should pass this dep - """ - dag = FakeDag(running_dagruns=[], max_active_runs=1) - task = FakeTask(dag=dag) - ti = FakeTI(dagrun="Fake Dagrun", task=task, dag_id="fake_dag") - - self.assertTrue(DagrunRunningDep().is_met(ti=ti, dep_context=None)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/not_in_retry_period_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/not_in_retry_period_dep.py b/tests/ti_deps/deps/not_in_retry_period_dep.py deleted file mode 100644 index a6657ba..0000000 --- a/tests/ti_deps/deps/not_in_retry_period_dep.py +++ /dev/null @@ -1,61 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from datetime import datetime, timedelta - -from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep -from airflow.utils.state import State -from fake_models import FakeDag, FakeTask, FakeTI - - -class NotInRetryPeriodDepTest(unittest.TestCase): - - def test_still_in_retry_period(self): - """ - Task instances that are in their retry period should fail this dep - """ - dag = FakeDag() - task = FakeTask(dag=dag, retry_delay=timedelta(minutes=1)) - ti = FakeTI( - task=task, - state=State.UP_FOR_RETRY, - end_date=datetime(2016, 1, 1), - is_premature=True) - - self.assertFalse(NotInRetryPeriodDep().is_met(ti=ti, dep_context=None)) - - def test_retry_period_finished(self): - """ - Task instance's that have had their retry period elapse should pass this dep - """ - dag = FakeDag() - task = FakeTask(dag=dag, retry_delay=timedelta(minutes=1)) - ti = FakeTI( - task=task, - state=State.UP_FOR_RETRY, - end_date=datetime(2016, 1, 1), - is_premature=False) - - self.assertTrue(NotInRetryPeriodDep().is_met(ti=ti, dep_context=None)) - - def test_not_in_retry_period(self): - """ - Task instance's that are not up for retry can not be in their retry period - """ - dag = FakeDag() - task = FakeTask(dag=dag) - ti = FakeTI(task=task, state=State.SUCCESS) - - self.assertTrue(NotInRetryPeriodDep().is_met(ti=ti, dep_context=None)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/not_running_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/not_running_dep.py b/tests/ti_deps/deps/not_running_dep.py deleted file mode 100644 index 159d923..0000000 --- a/tests/ti_deps/deps/not_running_dep.py +++ /dev/null @@ -1,39 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from datetime import datetime - -from airflow.ti_deps.deps.not_running_dep import NotRunningDep -from airflow.utils.state import State -from fake_models import FakeTI - - -class NotRunningDepTest(unittest.TestCase): - - def test_ti_running(self): - """ - Running task instances should fail this dep - """ - ti = FakeTI(state=State.RUNNING, start_date=datetime(2016, 1, 1)) - - self.assertFalse(NotRunningDep().is_met(ti=ti, dep_context=None)) - - def test_ti_not_running(self): - """ - Non-running task instances should pass this dep - """ - ti = FakeTI(state=State.NONE, start_date=datetime(2016, 1, 1)) - - self.assertTrue(NotRunningDep().is_met(ti=ti, dep_context=None)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/not_skipped_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/not_skipped_dep.py b/tests/ti_deps/deps/not_skipped_dep.py deleted file mode 100644 index 6d7ef55..0000000 --- a/tests/ti_deps/deps/not_skipped_dep.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from airflow.ti_deps.deps.not_skipped_dep import NotSkippedDep -from airflow.utils.state import State -from fake_models import FakeTI - - -class NotSkippedDepTest(unittest.TestCase): - - def test_skipped(self): - """ - Skipped task instances should fail this dep - """ - ti = FakeTI(state=State.SKIPPED) - - self.assertFalse(NotSkippedDep().is_met(ti=ti, dep_context=None)) - - def test_not_skipped(self): - """ - Non-skipped task instances should pass this dep - """ - ti = FakeTI(state=State.RUNNING) - - self.assertTrue(NotSkippedDep().is_met(ti=ti, dep_context=None)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/pool_has_space_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/pool_has_space_dep.py b/tests/ti_deps/deps/pool_has_space_dep.py deleted file mode 100644 index 411547a..0000000 --- a/tests/ti_deps/deps/pool_has_space_dep.py +++ /dev/null @@ -1,37 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from airflow.ti_deps.deps.pool_has_space_dep import PoolHasSpaceDep -from fake_models import FakeTI - - -class PoolHasSpaceDepTest(unittest.TestCase): - - def test_pool_full(self): - """ - Full pools should fail this dep - """ - ti = FakeTI(pool="fake_pool", pool_filled=True) - - self.assertFalse(PoolHasSpaceDep().is_met(ti=ti, dep_context=None)) - - def test_not_skipped(self): - """ - Pools with room should pass this dep - """ - ti = FakeTI(pool="fake_pool", pool_filled=False) - - self.assertTrue(PoolHasSpaceDep().is_met(ti=ti, dep_context=None)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/prev_dagrun_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/prev_dagrun_dep.py b/tests/ti_deps/deps/prev_dagrun_dep.py deleted file mode 100644 index 4873467..0000000 --- a/tests/ti_deps/deps/prev_dagrun_dep.py +++ /dev/null @@ -1,143 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from datetime import datetime - -from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep -from airflow.utils.state import State -from fake_models import FakeContext, FakeTask, FakeTI - - -class PrevDagrunDepTest(unittest.TestCase): - - def test_not_depends_on_past(self): - """ - If depends on past isn't set in the task then the previous dagrun should be - ignored, even though there is no previous_ti which would normally fail the dep - """ - task = FakeTask( - depends_on_past=False, - start_date=datetime(2016, 1, 1), - wait_for_downstream=False) - prev_ti = FakeTI( - task=task, - execution_date=datetime(2016, 1, 2), - state=State.SUCCESS, - dependents_done=True) - ti = FakeTI( - task=task, - previous_ti=prev_ti, - execution_date=datetime(2016, 1, 3)) - dep_context = FakeContext(ignore_depends_on_past=False) - - self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)) - - def test_context_ignore_depends_on_past(self): - """ - If the context overrides depends_on_past then the dep should be met, even though - there is no previous_ti which would normally fail the dep - """ - task = FakeTask( - depends_on_past=True, - start_date=datetime(2016, 1, 1), - wait_for_downstream=False) - prev_ti = FakeTI( - task=task, - execution_date=datetime(2016, 1, 2), - state=State.SUCCESS, - dependents_done=True) - ti = FakeTI( - task=task, - previous_ti=prev_ti, - execution_date=datetime(2016, 1, 3)) - dep_context = FakeContext(ignore_depends_on_past=True) - - self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)) - - def test_first_task_run(self): - """ - The first task run for a TI should pass since it has no previous dagrun. - """ - task = FakeTask( - depends_on_past=True, - start_date=datetime(2016, 1, 1), - wait_for_downstream=False) - prev_ti = None - ti = FakeTI( - task=task, - previous_ti=prev_ti, - execution_date=datetime(2016, 1, 1)) - dep_context = FakeContext(ignore_depends_on_past=False) - - self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)) - - def test_prev_ti_bad_state(self): - """ - If the previous TI did not complete execution this dep should fail. - """ - task = FakeTask( - depends_on_past=True, - start_date=datetime(2016, 1, 1), - wait_for_downstream=False) - prev_ti = FakeTI( - state=State.NONE, - dependents_done=True) - ti = FakeTI( - task=task, - previous_ti=prev_ti, - execution_date=datetime(2016, 1, 2)) - dep_context = FakeContext(ignore_depends_on_past=False) - - self.assertFalse(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)) - - def test_failed_wait_for_downstream(self): - """ - If the previous TI specified to wait for the downstream tasks of the previous - dagrun then it should fail this dep if the downstream TIs of the previous TI are - not done. - """ - task = FakeTask( - depends_on_past=True, - start_date=datetime(2016, 1, 1), - wait_for_downstream=True) - prev_ti = FakeTI( - state=State.SUCCESS, - dependents_done=False) - ti = FakeTI( - task=task, - previous_ti=prev_ti, - execution_date=datetime(2016, 1, 2)) - dep_context = FakeContext(ignore_depends_on_past=False) - - self.assertFalse(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)) - - def test_all_met(self): - """ - Test to make sure all of the conditions for the dep are met - """ - task = FakeTask( - depends_on_past=True, - start_date=datetime(2016, 1, 1), - wait_for_downstream=True) - prev_ti = FakeTI( - state=State.SUCCESS, - dependents_done=True) - ti = FakeTI( - task=task, - previous_ti=prev_ti, - execution_date=datetime(2016, 1, 2)) - dep_context = FakeContext(ignore_depends_on_past=False) - - self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/runnable_exec_date_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/runnable_exec_date_dep.py b/tests/ti_deps/deps/runnable_exec_date_dep.py deleted file mode 100644 index ae09ddb..0000000 --- a/tests/ti_deps/deps/runnable_exec_date_dep.py +++ /dev/null @@ -1,92 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from datetime import datetime -from mock import patch - -from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep -from fake_models import FakeDag, FakeTask, FakeTI -from tests.test_utils.fake_datetime import FakeDatetime - - -class RunnableExecDateDepTest(unittest.TestCase): - - @patch('airflow.ti_deps.deps.runnable_exec_date_dep.datetime', FakeDatetime) - def test_exec_date_after_end_date(self): - """ - If the dag's execution date is in the future this dep should fail - """ - FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 1)) - dag = FakeDag(end_date=datetime(2016, 1, 3)) - task = FakeTask(dag=dag, end_date=datetime(2016, 1, 3)) - ti = FakeTI(task=task, execution_date=datetime(2016, 1, 2)) - - self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None)) - - def test_exec_date_before_task_end_date(self): - """ - If the task instance execution date is before the DAG's end date this dep should - fail - """ - FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 3)) - dag = FakeDag(end_date=datetime(2016, 1, 1)) - task = FakeTask(dag=dag, end_date=datetime(2016, 1, 2)) - ti = FakeTI(task=task, execution_date=datetime(2016, 1, 1)) - - self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None)) - - def test_exec_date_after_task_end_date(self): - """ - If the task instance execution date is after the DAG's end date this dep should - fail - """ - FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 3)) - dag = FakeDag(end_date=datetime(2016, 1, 3)) - task = FakeTask(dag=dag, end_date=datetime(2016, 1, 1)) - ti = FakeTI(task=task, execution_date=datetime(2016, 1, 2)) - - self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None)) - - def test_exec_date_before_dag_end_date(self): - """ - If the task instance execution date is before the dag's end date this dep should - fail - """ - dag = FakeDag(start_date=datetime(2016, 1, 2)) - task = FakeTask(dag=dag, start_date=datetime(2016, 1, 1)) - ti = FakeTI(task=task, execution_date=datetime(2016, 1, 1)) - - self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None)) - - def test_exec_date_after_dag_end_date(self): - """ - If the task instance execution date is after the dag's end date this dep should - fail - """ - dag = FakeDag(end_date=datetime(2016, 1, 1)) - task = FakeTask(dag=dag, end_date=datetime(2016, 1, 3)) - ti = FakeTI(task=task, execution_date=datetime(2016, 1, 2)) - - self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None)) - - def test_all_deps_met(self): - """ - Test to make sure all of the conditions for the dep are met - """ - dag = FakeDag(end_date=datetime(2016, 1, 2)) - task = FakeTask(dag=dag, end_date=datetime(2016, 1, 2)) - ti = FakeTI(task=task, execution_date=datetime(2016, 1, 1)) - - self.assertTrue(RunnableExecDateDep().is_met(ti=ti, dep_context=None)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py new file mode 100644 index 0000000..6910b66 --- /dev/null +++ b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from mock import Mock + +from airflow.models import TaskInstance +from airflow.ti_deps.deps.dag_ti_slots_available_dep import DagTISlotsAvailableDep + + +class DagTISlotsAvailableDepTest(unittest.TestCase): + + def test_concurrency_reached(self): + """ + Test concurrency reached should fail dep + """ + dag = Mock(concurrency=1, concurrency_reached=True) + task = Mock(dag=dag) + ti = TaskInstance(task, execution_date=None) + + self.assertFalse(DagTISlotsAvailableDep().is_met(ti=ti)) + + def test_all_conditions_met(self): + """ + Test all conditions met should pass dep + """ + dag = Mock(concurrency=1, concurrency_reached=False) + task = Mock(dag=dag) + ti = TaskInstance(task, execution_date=None) + + self.assertTrue(DagTISlotsAvailableDep().is_met(ti=ti)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_dag_unpaused_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/test_dag_unpaused_dep.py b/tests/ti_deps/deps/test_dag_unpaused_dep.py new file mode 100644 index 0000000..969889a --- /dev/null +++ b/tests/ti_deps/deps/test_dag_unpaused_dep.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from mock import Mock + +from airflow.models import TaskInstance +from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep + + +class DagUnpausedDepTest(unittest.TestCase): + + def test_concurrency_reached(self): + """ + Test paused DAG should fail dependency + """ + dag = Mock(is_paused=True) + task = Mock(dag=dag) + ti = TaskInstance(task=task, execution_date=None) + + self.assertFalse(DagUnpausedDep().is_met(ti=ti)) + + def test_all_conditions_met(self): + """ + Test all conditions met should pass dep + """ + dag = Mock(is_paused=False) + task = Mock(dag=dag) + ti = TaskInstance(task=task, execution_date=None) + + self.assertTrue(DagUnpausedDep().is_met(ti=ti)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_dagrun_exists_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/test_dagrun_exists_dep.py b/tests/ti_deps/deps/test_dagrun_exists_dep.py new file mode 100644 index 0000000..daad269 --- /dev/null +++ b/tests/ti_deps/deps/test_dagrun_exists_dep.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from airflow.utils.state import State +from mock import Mock, patch + +from airflow.models import DAG, DagRun +from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep + + +class DagrunRunningDepTest(unittest.TestCase): + + @patch('airflow.models.DagRun.find', return_value=()) + def test_dagrun_doesnt_exist(self, dagrun_find): + """ + Task instances without dagruns should fail this dep + """ + dag = DAG('test_dag', max_active_runs=2) + ti = Mock(task=Mock(dag=dag), get_dagrun=Mock(return_value=None)) + self.assertFalse(DagrunRunningDep().is_met(ti=ti)) + + def test_dagrun_exists(self): + """ + Task instances with a dagrun should pass this dep + """ + dagrun = DagRun(state=State.RUNNING) + ti = Mock(get_dagrun=Mock(return_value=dagrun)) + self.assertTrue(DagrunRunningDep().is_met(ti=ti)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_not_in_retry_period_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py b/tests/ti_deps/deps/test_not_in_retry_period_dep.py new file mode 100644 index 0000000..0f23aab --- /dev/null +++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from datetime import datetime, timedelta +from freezegun import freeze_time +from mock import Mock + +from airflow.models import TaskInstance +from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep +from airflow.utils.state import State + + +class NotInRetryPeriodDepTest(unittest.TestCase): + + def _get_task_instance(self, state, end_date=None, + retry_delay=timedelta(minutes=15)): + task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False) + ti = TaskInstance(task=task, state=state, execution_date=None) + ti.end_date = end_date + return ti + + @freeze_time('2016-01-01 15:44') + def test_still_in_retry_period(self): + """ + Task instances that are in their retry period should fail this dep + """ + ti = self._get_task_instance(State.UP_FOR_RETRY, + end_date=datetime(2016, 1, 1, 15, 30)) + self.assertTrue(ti.is_premature) + self.assertFalse(NotInRetryPeriodDep().is_met(ti=ti)) + + @freeze_time('2016-01-01 15:46') + def test_retry_period_finished(self): + """ + Task instance's that have had their retry period elapse should pass this dep + """ + ti = self._get_task_instance(State.UP_FOR_RETRY, + end_date=datetime(2016, 1, 1)) + self.assertFalse(ti.is_premature) + self.assertTrue(NotInRetryPeriodDep().is_met(ti=ti)) + + def test_not_in_retry_period(self): + """ + Task instance's that are not up for retry can not be in their retry period + """ + ti = self._get_task_instance(State.SUCCESS) + self.assertTrue(NotInRetryPeriodDep().is_met(ti=ti)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_not_running_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/test_not_running_dep.py b/tests/ti_deps/deps/test_not_running_dep.py new file mode 100644 index 0000000..7f8f0cd --- /dev/null +++ b/tests/ti_deps/deps/test_not_running_dep.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from datetime import datetime +from mock import Mock + +from airflow.ti_deps.deps.not_running_dep import NotRunningDep +from airflow.utils.state import State + + +class NotRunningDepTest(unittest.TestCase): + + def test_ti_running(self): + """ + Running task instances should fail this dep + """ + ti = Mock(state=State.RUNNING, start_date=datetime(2016, 1, 1)) + self.assertFalse(NotRunningDep().is_met(ti=ti)) + + def test_ti_not_running(self): + """ + Non-running task instances should pass this dep + """ + ti = Mock(state=State.NONE, start_date=datetime(2016, 1, 1)) + self.assertTrue(NotRunningDep().is_met(ti=ti)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_not_skipped_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/test_not_skipped_dep.py b/tests/ti_deps/deps/test_not_skipped_dep.py new file mode 100644 index 0000000..8a31bf9 --- /dev/null +++ b/tests/ti_deps/deps/test_not_skipped_dep.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from mock import Mock + +from airflow.ti_deps.deps.not_skipped_dep import NotSkippedDep +from airflow.utils.state import State + + +class NotSkippedDepTest(unittest.TestCase): + + def test_skipped(self): + """ + Skipped task instances should fail this dep + """ + ti = Mock(state=State.SKIPPED) + self.assertFalse(NotSkippedDep().is_met(ti=ti)) + + def test_not_skipped(self): + """ + Non-skipped task instances should pass this dep + """ + ti = Mock(state=State.RUNNING) + self.assertTrue(NotSkippedDep().is_met(ti=ti)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_prev_dagrun_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/test_prev_dagrun_dep.py b/tests/ti_deps/deps/test_prev_dagrun_dep.py new file mode 100644 index 0000000..0f6f5da --- /dev/null +++ b/tests/ti_deps/deps/test_prev_dagrun_dep.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from datetime import datetime +from mock import Mock + +from airflow.models import DAG, BaseOperator +from airflow.ti_deps.dep_context import DepContext +from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep +from airflow.utils.state import State + + +class PrevDagrunDepTest(unittest.TestCase): + + def _get_task(self, **kwargs): + return BaseOperator(task_id='test_task', dag=DAG('test_dag'), **kwargs) + + def test_not_depends_on_past(self): + """ + If depends on past isn't set in the task then the previous dagrun should be + ignored, even though there is no previous_ti which would normally fail the dep + """ + task = self._get_task(depends_on_past=False, + start_date=datetime(2016, 1, 1), + wait_for_downstream=False) + prev_ti = Mock(task=task, state=State.SUCCESS, + are_dependents_done=Mock(return_value=True), + execution_date=datetime(2016, 1, 2)) + ti = Mock(task=task, previous_ti=prev_ti, + execution_date=datetime(2016, 1, 3)) + dep_context = DepContext(ignore_depends_on_past=False) + + self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)) + + def test_context_ignore_depends_on_past(self): + """ + If the context overrides depends_on_past then the dep should be met, + even though there is no previous_ti which would normally fail the dep + """ + task = self._get_task(depends_on_past=True, + start_date=datetime(2016, 1, 1), + wait_for_downstream=False) + prev_ti = Mock(task=task, state=State.SUCCESS, + are_dependents_done=Mock(return_value=True), + execution_date=datetime(2016, 1, 2)) + ti = Mock(task=task, previous_ti=prev_ti, + execution_date=datetime(2016, 1, 3)) + dep_context = DepContext(ignore_depends_on_past=True) + + self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)) + + def test_first_task_run(self): + """ + The first task run for a TI should pass since it has no previous dagrun. + """ + task = self._get_task(depends_on_past=True, + start_date=datetime(2016, 1, 1), + wait_for_downstream=False) + prev_ti = None + ti = Mock(task=task, previous_ti=prev_ti, + execution_date=datetime(2016, 1, 1)) + dep_context = DepContext(ignore_depends_on_past=False) + + self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)) + + def test_prev_ti_bad_state(self): + """ + If the previous TI did not complete execution this dep should fail. + """ + task = self._get_task(depends_on_past=True, + start_date=datetime(2016, 1, 1), + wait_for_downstream=False) + prev_ti = Mock(state=State.NONE, + are_dependents_done=Mock(return_value=True)) + ti = Mock(task=task, previous_ti=prev_ti, + execution_date=datetime(2016, 1, 2)) + dep_context = DepContext(ignore_depends_on_past=False) + + self.assertFalse(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)) + + def test_failed_wait_for_downstream(self): + """ + If the previous TI specified to wait for the downstream tasks of the + previous dagrun then it should fail this dep if the downstream TIs of + the previous TI are not done. + """ + task = self._get_task(depends_on_past=True, + start_date=datetime(2016, 1, 1), + wait_for_downstream=True) + prev_ti = Mock(state=State.SUCCESS, + are_dependents_done=Mock(return_value=False)) + ti = Mock(task=task, previous_ti=prev_ti, + execution_date=datetime(2016, 1, 2)) + dep_context = DepContext(ignore_depends_on_past=False) + + self.assertFalse(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)) + + def test_all_met(self): + """ + Test to make sure all of the conditions for the dep are met + """ + task = self._get_task(depends_on_past=True, + start_date=datetime(2016, 1, 1), + wait_for_downstream=True) + prev_ti = Mock(state=State.SUCCESS, + are_dependents_done=Mock(return_value=True)) + ti = Mock(task=task, previous_ti=prev_ti, + execution_date=datetime(2016, 1, 2)) + dep_context = DepContext(ignore_depends_on_past=False) + + self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_runnable_exec_date_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/test_runnable_exec_date_dep.py b/tests/ti_deps/deps/test_runnable_exec_date_dep.py new file mode 100644 index 0000000..e1a396c --- /dev/null +++ b/tests/ti_deps/deps/test_runnable_exec_date_dep.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from datetime import datetime +from freezegun import freeze_time +from mock import Mock + +from airflow.models import TaskInstance +from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep + + +class RunnableExecDateDepTest(unittest.TestCase): + + def _get_task_instance(self, execution_date, dag_end_date=None, task_end_date=None): + dag = Mock(end_date=dag_end_date) + task = Mock(dag=dag, end_date=task_end_date) + return TaskInstance(task=task, execution_date=execution_date) + + @freeze_time('2016-01-01') + def test_exec_date_after_end_date(self): + """ + If the dag's execution date is in the future this dep should fail + """ + ti = self._get_task_instance( + dag_end_date=datetime(2016, 1, 3), + task_end_date=datetime(2016, 1, 3), + execution_date=datetime(2016, 1, 2), + ) + self.assertFalse(RunnableExecDateDep().is_met(ti=ti)) + + def test_exec_date_after_task_end_date(self): + """ + If the task instance execution date is after the tasks's end date + this dep should fail + """ + ti = self._get_task_instance( + dag_end_date=datetime(2016, 1, 3), + task_end_date=datetime(2016, 1, 1), + execution_date=datetime(2016, 1, 2), + ) + self.assertFalse(RunnableExecDateDep().is_met(ti=ti)) + + def test_exec_date_after_dag_end_date(self): + """ + If the task instance execution date is after the dag's end date + this dep should fail + """ + ti = self._get_task_instance( + dag_end_date=datetime(2016, 1, 1), + task_end_date=datetime(2016, 1, 3), + execution_date=datetime(2016, 1, 2), + ) + self.assertFalse(RunnableExecDateDep().is_met(ti=ti)) + + def test_all_deps_met(self): + """ + Test to make sure all of the conditions for the dep are met + """ + ti = self._get_task_instance( + dag_end_date=datetime(2016, 1, 2), + task_end_date=datetime(2016, 1, 2), + execution_date=datetime(2016, 1, 1), + ) + self.assertTrue(RunnableExecDateDep().is_met(ti=ti)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_trigger_rule_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py b/tests/ti_deps/deps/test_trigger_rule_dep.py new file mode 100644 index 0000000..a61ff0d --- /dev/null +++ b/tests/ti_deps/deps/test_trigger_rule_dep.py @@ -0,0 +1,252 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from datetime import datetime + +from airflow.models import BaseOperator, TaskInstance +from airflow.utils.trigger_rule import TriggerRule +from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep +from airflow.utils.state import State + + +class TriggerRuleDepTest(unittest.TestCase): + + def _get_task_instance(self, trigger_rule=TriggerRule.ALL_SUCCESS, + state=None, upstream_task_ids=None): + task = BaseOperator(task_id='test_task', trigger_rule=trigger_rule, + start_date=datetime(2015, 1, 1)) + if upstream_task_ids: + task._upstream_task_ids.extend(upstream_task_ids) + return TaskInstance(task=task, state=state, execution_date=None) + + def test_no_upstream_tasks(self): + """ + If the TI has no upstream TIs then there is nothing to check and the dep is passed + """ + ti = self._get_task_instance(TriggerRule.ALL_DONE, State.UP_FOR_RETRY) + self.assertTrue(TriggerRuleDep().is_met(ti=ti)) + + def test_dummy_tr(self): + """ + The dummy trigger rule should always pass this dep + """ + ti = self._get_task_instance(TriggerRule.DUMMY, State.UP_FOR_RETRY) + self.assertTrue(TriggerRuleDep().is_met(ti=ti)) + + def test_one_success_tr_success(self): + """ + One-success trigger rule success + """ + ti = self._get_task_instance(TriggerRule.ONE_SUCCESS, State.UP_FOR_RETRY) + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=1, + skipped=2, + failed=2, + upstream_failed=2, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + self.assertEqual(len(dep_statuses), 0) + + def test_one_success_tr_failure(self): + """ + One-success trigger rule failure + """ + ti = self._get_task_instance(TriggerRule.ONE_SUCCESS) + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=0, + skipped=2, + failed=2, + upstream_failed=2, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) + + def test_one_failure_tr_failure(self): + """ + One-failure trigger rule failure + """ + ti = self._get_task_instance(TriggerRule.ONE_FAILED) + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=2, + skipped=0, + failed=0, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) + + def test_one_failure_tr_success(self): + """ + One-failure trigger rule success + """ + ti = self._get_task_instance(TriggerRule.ONE_FAILED) + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=0, + skipped=2, + failed=2, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + self.assertEqual(len(dep_statuses), 0) + + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=0, + skipped=2, + failed=0, + upstream_failed=2, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + self.assertEqual(len(dep_statuses), 0) + + def test_all_success_tr_success(self): + """ + All-success trigger rule success + """ + ti = self._get_task_instance(TriggerRule.ALL_SUCCESS, + upstream_task_ids=["FakeTaskID"]) + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=1, + skipped=0, + failed=0, + upstream_failed=0, + done=1, + flag_upstream_failed=False, + session="Fake Session")) + self.assertEqual(len(dep_statuses), 0) + + def test_all_success_tr_failure(self): + """ + All-success trigger rule failure + """ + ti = self._get_task_instance(TriggerRule.ALL_SUCCESS, + upstream_task_ids=["FakeTaskID", + "OtherFakeTaskID"]) + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=1, + skipped=0, + failed=1, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) + + def test_all_failed_tr_success(self): + """ + All-failed trigger rule success + """ + ti = self._get_task_instance(TriggerRule.ALL_FAILED, + upstream_task_ids=["FakeTaskID", + "OtherFakeTaskID"]) + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=0, + skipped=0, + failed=2, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + self.assertEqual(len(dep_statuses), 0) + + def test_all_failed_tr_failure(self): + """ + All-failed trigger rule failure + """ + ti = self._get_task_instance(TriggerRule.ALL_FAILED, + upstream_task_ids=["FakeTaskID", + "OtherFakeTaskID"]) + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=2, + skipped=0, + failed=0, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) + + def test_all_done_tr_success(self): + """ + All-done trigger rule success + """ + ti = self._get_task_instance(TriggerRule.ALL_DONE, + upstream_task_ids=["FakeTaskID", + "OtherFakeTaskID"]) + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=2, + skipped=0, + failed=0, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + self.assertEqual(len(dep_statuses), 0) + + def test_all_done_tr_failure(self): + """ + All-done trigger rule failure + """ + ti = self._get_task_instance(TriggerRule.ALL_DONE, + upstream_task_ids=["FakeTaskID", + "OtherFakeTaskID"]) + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=1, + skipped=0, + failed=0, + upstream_failed=0, + done=1, + flag_upstream_failed=False, + session="Fake Session")) + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) + + def test_unknown_tr(self): + """ + Unknown trigger rules should cause this dep to fail + """ + ti = self._get_task_instance() + ti.task.trigger_rule = "Unknown Trigger Rule" + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=1, + skipped=0, + failed=0, + upstream_failed=0, + done=1, + flag_upstream_failed=False, + session="Fake Session")) + + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_valid_state_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/test_valid_state_dep.py b/tests/ti_deps/deps/test_valid_state_dep.py new file mode 100644 index 0000000..2ece718 --- /dev/null +++ b/tests/ti_deps/deps/test_valid_state_dep.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from datetime import datetime +from mock import Mock + +from airflow import AirflowException +from airflow.ti_deps.deps.valid_state_dep import ValidStateDep +from airflow.utils.state import State + + +class ValidStateDepTest(unittest.TestCase): + + def test_valid_state(self): + """ + Valid state should pass this dep + """ + ti = Mock(state=State.QUEUED, end_date=datetime(2016, 1, 1)) + self.assertTrue(ValidStateDep({State.QUEUED}).is_met(ti=ti)) + + def test_invalid_state(self): + """ + Invalid state should fail this dep + """ + ti = Mock(state=State.SUCCESS, end_date=datetime(2016, 1, 1)) + self.assertFalse(ValidStateDep({State.FAILED}).is_met(ti=ti)) + + def test_no_valid_states(self): + """ + If there are no valid states the dependency should throw + """ + ti = Mock(state=State.SUCCESS, end_date=datetime(2016, 1, 1)) + with self.assertRaises(AirflowException): + ValidStateDep({}).is_met(ti=ti) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/trigger_rule_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/trigger_rule_dep.py b/tests/ti_deps/deps/trigger_rule_dep.py deleted file mode 100644 index 04a7737..0000000 --- a/tests/ti_deps/deps/trigger_rule_dep.py +++ /dev/null @@ -1,295 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from airflow.utils.trigger_rule import TriggerRule -from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep -from airflow.utils.state import State -from fake_models import FakeTask, FakeTI - - -class TriggerRuleDepTest(unittest.TestCase): - - def test_no_upstream_tasks(self): - """ - If the TI has no upstream TIs then there is nothing to check and the dep is passed - """ - task = FakeTask( - trigger_rule=TriggerRule.ALL_DONE, - upstream_list=[]) - ti = FakeTI( - task=task, - state=State.UP_FOR_RETRY) - - self.assertTrue(TriggerRuleDep().is_met(ti=ti, dep_context=None)) - - def test_dummy_tr(self): - """ - The dummy trigger rule should always pass this dep - """ - task = FakeTask( - trigger_rule=TriggerRule.DUMMY, - upstream_list=[]) - ti = FakeTI( - task=task, - state=State.UP_FOR_RETRY) - - self.assertTrue(TriggerRuleDep().is_met(ti=ti, dep_context=None)) - - def test_one_success_tr_success(self): - """ - One-success trigger rule success - """ - task = FakeTask( - trigger_rule=TriggerRule.ONE_SUCCESS, - upstream_task_ids=[]) - ti = FakeTI(task=task) - - dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( - ti=ti, - successes=1, - skipped=2, - failed=2, - upstream_failed=2, - done=2, - flag_upstream_failed=False, - session="Fake Session")) - - self.assertEqual(len(dep_statuses), 0) - - def test_one_success_tr_failure(self): - """ - One-success trigger rule failure - """ - task = FakeTask( - trigger_rule=TriggerRule.ONE_SUCCESS, - upstream_task_ids=[]) - ti = FakeTI(task=task) - - dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( - ti=ti, - successes=0, - skipped=2, - failed=2, - upstream_failed=2, - done=2, - flag_upstream_failed=False, - session="Fake Session")) - - self.assertEqual(len(dep_statuses), 1) - self.assertFalse(dep_statuses[0].passed) - - def test_one_failure_tr_failure(self): - """ - One-failure trigger rule failure - """ - task = FakeTask( - trigger_rule=TriggerRule.ONE_FAILED, - upstream_task_ids=[]) - ti = FakeTI(task=task) - - dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( - ti=ti, - successes=2, - skipped=0, - failed=0, - upstream_failed=0, - done=2, - flag_upstream_failed=False, - session="Fake Session")) - - def test_one_failure_tr_success(self): - """ - One-failure trigger rule success - """ - task = FakeTask( - trigger_rule=TriggerRule.ONE_FAILED, - upstream_task_ids=[]) - ti = FakeTI(task=task) - - dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( - ti=ti, - successes=0, - skipped=2, - failed=2, - upstream_failed=0, - done=2, - flag_upstream_failed=False, - session="Fake Session")) - - self.assertEqual(len(dep_statuses), 0) - - dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( - ti=ti, - successes=0, - skipped=2, - failed=0, - upstream_failed=2, - done=2, - flag_upstream_failed=False, - session="Fake Session")) - - self.assertEqual(len(dep_statuses), 0) - - def test_all_success_tr_success(self): - """ - All-success trigger rule success - """ - task = FakeTask( - trigger_rule=TriggerRule.ALL_SUCCESS, - upstream_task_ids=["FakeTaskID"]) - ti = FakeTI(task=task) - - dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( - ti=ti, - successes=1, - skipped=0, - failed=0, - upstream_failed=0, - done=1, - flag_upstream_failed=False, - session="Fake Session")) - - self.assertEqual(len(dep_statuses), 0) - - def test_all_success_tr_failure(self): - """ - All-success trigger rule failure - """ - task = FakeTask( - trigger_rule=TriggerRule.ALL_SUCCESS, - upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]) - ti = FakeTI(task=task) - - dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( - ti=ti, - successes=1, - skipped=0, - failed=1, - upstream_failed=0, - done=2, - flag_upstream_failed=False, - session="Fake Session")) - - self.assertEqual(len(dep_statuses), 1) - self.assertFalse(dep_statuses[0].passed) - - def test_all_failed_tr_success(self): - """ - All-failed trigger rule success - """ - task = FakeTask( - trigger_rule=TriggerRule.ALL_FAILED, - upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]) - ti = FakeTI(task=task) - - dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( - ti=ti, - successes=0, - skipped=0, - failed=2, - upstream_failed=0, - done=2, - flag_upstream_failed=False, - session="Fake Session")) - - self.assertEqual(len(dep_statuses), 0) - - def test_all_failed_tr_failure(self): - """ - All-failed trigger rule failure - """ - task = FakeTask( - trigger_rule=TriggerRule.ALL_FAILED, - upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]) - ti = FakeTI(task=task) - - dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( - ti=ti, - successes=2, - skipped=0, - failed=0, - upstream_failed=0, - done=2, - flag_upstream_failed=False, - session="Fake Session")) - - self.assertEqual(len(dep_statuses), 1) - self.assertFalse(dep_statuses[0].passed) - - def test_all_done_tr_success(self): - """ - All-done trigger rule success - """ - task = FakeTask( - trigger_rule=TriggerRule.ALL_DONE, - upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]) - ti = FakeTI(task=task) - - dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( - ti=ti, - successes=2, - skipped=0, - failed=0, - upstream_failed=0, - done=2, - flag_upstream_failed=False, - session="Fake Session")) - - self.assertEqual(len(dep_statuses), 0) - - def test_all_done_tr_failure(self): - """ - All-done trigger rule failure - """ - task = FakeTask( - trigger_rule=TriggerRule.ALL_DONE, - upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]) - ti = FakeTI(task=task) - - dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( - ti=ti, - successes=1, - skipped=0, - failed=0, - upstream_failed=0, - done=1, - flag_upstream_failed=False, - session="Fake Session")) - - self.assertEqual(len(dep_statuses), 1) - self.assertFalse(dep_statuses[0].passed) - - def test_unknown_tr(self): - """ - Unknown trigger rules should cause this dep to fail - """ - task = FakeTask( - trigger_rule="Unknown Trigger Rule", - upstream_task_ids=[]) - ti = FakeTI(task=task) - - dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( - ti=ti, - successes=1, - skipped=0, - failed=0, - upstream_failed=0, - done=1, - flag_upstream_failed=False, - session="Fake Session")) - - self.assertEqual(len(dep_statuses), 1) - self.assertFalse(dep_statuses[0].passed) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/valid_state_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/valid_state_dep.py b/tests/ti_deps/deps/valid_state_dep.py deleted file mode 100644 index 6bc0835..0000000 --- a/tests/ti_deps/deps/valid_state_dep.py +++ /dev/null @@ -1,49 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from datetime import datetime - -from airflow import AirflowException -from airflow.ti_deps.deps.valid_state_dep import ValidStateDep -from airflow.utils.state import State -from fake_models import FakeTI - - -class ValidStateDepTest(unittest.TestCase): - - def test_valid_state(self): - """ - Valid state should pass this dep - """ - ti = FakeTI(state=State.QUEUED, end_date=datetime(2016, 1, 1)) - - self.assertTrue(ValidStateDep({State.QUEUED}).is_met(ti=ti, dep_context=None)) - - def test_invalid_state(self): - """ - Invalid state should fail this dep - """ - ti = FakeTI(state=State.SUCCESS, end_date=datetime(2016, 1, 1)) - - self.assertFalse(ValidStateDep({State.FAILURE}).is_met(ti=ti, dep_context=None)) - - def test_no_valid_states(self): - """ - If there are no valid states the dependency should throw - """ - ti = FakeTI(state=State.SUCCESS, end_date=datetime(2016, 1, 1)) - - with self.assertRaises(AirflowException): - ValidStateDep({}).is_met(ti=ti, dep_context=None)