airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject [6/6] incubator-airflow git commit: [AIRFLOW-2203] Defer cycle detection
Date Wed, 14 Mar 2018 08:15:45 GMT
[AIRFLOW-2203] Defer cycle detection

Moved from adding_task to when dag is being bagged.
This changes import dag runtime from polynomial to somewhat linear.

Closes #3116 from wongwill86:dag_import_speed


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c3730650
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c3730650
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c3730650

Branch: refs/heads/master
Commit: c3730650c852cd7a5e06a5933f5064bbb04e0e88
Parents: 6f0a0d2
Author: wongwill86 <wongwill86@gmail.com>
Authored: Mon Mar 12 16:53:01 2018 -0400
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Wed Mar 14 09:13:59 2018 +0100

----------------------------------------------------------------------
 airflow/exceptions.py |   4 +
 airflow/models.py     | 111 ++++++++---
 tests/core.py         |  16 --
 tests/models.py       | 450 ++++++++++++++++++++++++++++++++++++++++++++-
 4 files changed, 533 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/airflow/exceptions.py
----------------------------------------------------------------------
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 90d3e22..c1b728c 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -34,3 +34,7 @@ class AirflowTaskTimeout(AirflowException):
 
 class AirflowSkipException(AirflowException):
     pass
+
+
+class AirflowDagCycleException(AirflowException):
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 8931ac6..c1b608a 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -22,7 +22,7 @@ from future.standard_library import install_aliases
 from builtins import str
 from builtins import object, bytes
 import copy
-from collections import namedtuple
+from collections import namedtuple, defaultdict
 from datetime import timedelta
 
 import dill
@@ -63,7 +63,9 @@ import six
 from airflow import settings, utils
 from airflow.executors import GetDefaultExecutor, LocalExecutor
 from airflow import configuration
-from airflow.exceptions import AirflowException, AirflowSkipException, AirflowTaskTimeout
+from airflow.exceptions import (
+    AirflowDagCycleException, AirflowException, AirflowSkipException, AirflowTaskTimeout
+)
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
@@ -122,7 +124,6 @@ else:
 # Used by DAG context_managers
 _CONTEXT_MANAGER_DAG = None
 
-
 def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
     """
     Clears a set of task instances, but makes sure the running ones
@@ -183,6 +184,11 @@ class DagBag(BaseDagBag, LoggingMixin):
     :type include_examples: bool
     """
 
+    # static class variables to detetct dag cycle
+    CYCLE_NEW = 0
+    CYCLE_IN_PROGRESS = 1
+    CYCLE_DONE = 2
+
     def __init__(
             self,
             dag_folder=None,
@@ -335,10 +341,17 @@ class DagBag(BaseDagBag, LoggingMixin):
                         dag.full_filepath = filepath
                         if dag.fileloc != filepath:
                             dag.fileloc = filepath
-                    dag.is_subdag = False
-                    self.bag_dag(dag, parent_dag=dag, root_dag=dag)
-                    found_dags.append(dag)
-                    found_dags += dag.subdags
+                    try:
+                        dag.is_subdag = False
+                        self.bag_dag(dag, parent_dag=dag, root_dag=dag)
+                        found_dags.append(dag)
+                        found_dags += dag.subdags
+                    except AirflowDagCycleException as cycle_exception:
+                        self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
+                        self.import_errors[dag.full_filepath] = str(cycle_exception)
+                        self.file_last_changed[dag.full_filepath] = \
+                            file_last_changed_on_disk
+
 
         self.file_last_changed[filepath] = file_last_changed_on_disk
         return found_dags
@@ -381,20 +394,39 @@ class DagBag(BaseDagBag, LoggingMixin):
     def bag_dag(self, dag, parent_dag, root_dag):
         """
         Adds the DAG into the bag, recurses into sub dags.
+        Throws AirflowDagCycleException if a cycle is detected in this dag or its subdags
         """
-        self.dags[dag.dag_id] = dag
+
+        dag.test_cycle()  # throws if a task cycle is found
+
         dag.resolve_template_files()
         dag.last_loaded = timezone.utcnow()
 
         for task in dag.tasks:
             settings.policy(task)
 
-        for subdag in dag.subdags:
-            subdag.full_filepath = dag.full_filepath
-            subdag.parent_dag = dag
-            subdag.is_subdag = True
-            self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
-        self.log.debug('Loaded DAG {dag}'.format(**locals()))
+        subdags = dag.subdags
+
+        try:
+            for subdag in subdags:
+                subdag.full_filepath = dag.full_filepath
+                subdag.parent_dag = dag
+                subdag.is_subdag = True
+                self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
+
+            self.dags[dag.dag_id] = dag
+            self.log.debug('Loaded DAG {dag}'.format(**locals()))
+        except AirflowDagCycleException as cycle_exception:
+            # There was an error in bagging the dag. Remove it from the list of dags
+            self.log.exception('Exception bagging dag: {dag.dag_id}'.format(**locals()))
+            # Only necessary at the root level since DAG.subdags automatically
+            # performs DFS to search through all subdags
+            if dag == root_dag:
+                for subdag in subdags:
+                    if subdag.dag_id in self.dags:
+                        del self.dags[subdag.dag_id]
+            raise cycle_exception
+
 
     def collect_dags(
             self,
@@ -2699,21 +2731,6 @@ class BaseOperator(LoggingMixin):
         return list(map(lambda task_id: self._dag.task_dict[task_id],
                         self.get_flat_relative_ids(upstream)))
 
-    def detect_downstream_cycle(self, task=None):
-        """
-        When invoked, this routine will raise an exception if a cycle is
-        detected downstream from self. It is invoked when tasks are added to
-        the DAG to detect cycles.
-        """
-        if not task:
-            task = self
-        for t in self.get_direct_relatives():
-            if task is t:
-                msg = "Cycle detected in DAG. Faulty task: {0}".format(task)
-                raise AirflowException(msg)
-            else:
-                t.detect_downstream_cycle(task=task)
-        return False
 
     def run(
             self,
@@ -4074,6 +4091,42 @@ class DAG(BaseDag, LoggingMixin):
                 qry = qry.filter(TaskInstance.state.in_(states))
         return qry.scalar()
 
+    def test_cycle(self):
+        '''
+        Check to see if there are any cycles in the DAG. Returns False if no cycle found,
+        otherwise raises exception.
+        '''
+
+        # default of int is 0 which corresponds to CYCLE_NEW
+        visit_map = defaultdict(int)
+        for task_id in self.task_dict.keys():
+            # print('starting %s' % task_id)
+            if visit_map[task_id] == DagBag.CYCLE_NEW:
+                self._test_cycle_helper(visit_map, task_id)
+        return False
+
+    def _test_cycle_helper(self, visit_map, task_id):
+        '''
+        Checks if a cycle exists from the input task using DFS traversal
+        '''
+
+        # print('Inspecting %s' % task_id)
+        if visit_map[task_id] == DagBag.CYCLE_DONE:
+            return False
+
+        visit_map[task_id] = DagBag.CYCLE_IN_PROGRESS
+
+        task = self.task_dict[task_id]
+        for descendant_id in task.get_direct_relative_ids():
+            if visit_map[descendant_id] == DagBag.CYCLE_IN_PROGRESS:
+                msg = "Cycle detected in DAG. Faulty task: {0} to {1}".format(
+                    task_id, descendant_id)
+                raise AirflowDagCycleException(msg)
+            else:
+                self._test_cycle_helper(visit_map, descendant_id)
+
+        visit_map[task_id] = DagBag.CYCLE_DONE
+
 
 class Chart(Base):
     __tablename__ = "chart"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 03372db..ce5fb7a 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -781,22 +781,6 @@ class CoreTest(unittest.TestCase):
         with self.assertRaisesRegexp(AirflowException, regexp):
             self.run_after_loop.set_upstream(self.runme_0)
 
-    def test_cyclic_dependencies_1(self):
-
-        regexp = "Cycle detected in DAG. (.*)runme_0(.*)"
-        with self.assertRaisesRegexp(AirflowException, regexp):
-            self.runme_0.set_upstream(self.run_after_loop)
-
-    def test_cyclic_dependencies_2(self):
-        regexp = "Cycle detected in DAG. (.*)run_after_loop(.*)"
-        with self.assertRaisesRegexp(AirflowException, regexp):
-            self.run_after_loop.set_downstream(self.runme_0)
-
-    def test_cyclic_dependencies_3(self):
-        regexp = "Cycle detected in DAG. (.*)run_this_last(.*)"
-        with self.assertRaisesRegexp(AirflowException, regexp):
-            self.run_this_last.set_downstream(self.runme_0)
-
     def test_bad_trigger_rule(self):
         with self.assertRaises(AirflowException):
             DummyOperator(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index c8ee037..5d8184c 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -26,9 +26,11 @@ import time
 import six
 import re
 import urllib
+import textwrap
+import inspect
 
 from airflow import configuration, models, settings, AirflowException
-from airflow.exceptions import AirflowSkipException
+from airflow.exceptions import AirflowDagCycleException, AirflowSkipException
 from airflow.jobs import BackfillJob
 from airflow.models import DAG, TaskInstance as TI
 from airflow.models import State as ST
@@ -47,7 +49,7 @@ from airflow.utils.state import State
 from airflow.utils.trigger_rule import TriggerRule
 from mock import patch
 from parameterized import parameterized
-
+from tempfile import NamedTemporaryFile
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 TEST_DAGS_FOLDER = os.path.join(
@@ -388,6 +390,132 @@ class DagTest(unittest.TestCase):
         result = task.render_template('', "{{ 'world' | hello}}", dict())
         self.assertEqual(result, 'Hello world')
 
+    def test_cycle(self):
+        # test empty
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        self.assertFalse(dag.test_cycle())
+
+        # test single task
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        with dag:
+            opA = DummyOperator(task_id='A')
+
+        self.assertFalse(dag.test_cycle())
+
+        # test no cycle
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> B -> C
+        #      B -> D
+        # E -> F
+        with dag:
+            opA = DummyOperator(task_id='A')
+            opB = DummyOperator(task_id='B')
+            opC = DummyOperator(task_id='C')
+            opD = DummyOperator(task_id='D')
+            opE = DummyOperator(task_id='E')
+            opF = DummyOperator(task_id='F')
+            opA.set_downstream(opB)
+            opB.set_downstream(opC)
+            opB.set_downstream(opD)
+            opE.set_downstream(opF)
+
+        self.assertFalse(dag.test_cycle())
+
+        # test self loop
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> A
+        with dag:
+            opA = DummyOperator(task_id='A')
+            opA.set_downstream(opA)
+
+        with self.assertRaises(AirflowDagCycleException):
+            dag.test_cycle()
+
+        # test downstream self loop
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> B -> C -> D -> E -> E
+        with dag:
+            opA = DummyOperator(task_id='A')
+            opB = DummyOperator(task_id='B')
+            opC = DummyOperator(task_id='C')
+            opD = DummyOperator(task_id='D')
+            opE = DummyOperator(task_id='E')
+            opA.set_downstream(opB)
+            opB.set_downstream(opC)
+            opC.set_downstream(opD)
+            opD.set_downstream(opE)
+            opE.set_downstream(opE)
+
+        with self.assertRaises(AirflowDagCycleException):
+            dag.test_cycle()
+
+        # large loop
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> B -> C -> D -> E -> A
+        with dag:
+            opA = DummyOperator(task_id='A')
+            opB = DummyOperator(task_id='B')
+            opC = DummyOperator(task_id='C')
+            opD = DummyOperator(task_id='D')
+            opE = DummyOperator(task_id='E')
+            opA.set_downstream(opB)
+            opB.set_downstream(opC)
+            opC.set_downstream(opD)
+            opD.set_downstream(opE)
+            opE.set_downstream(opA)
+
+        with self.assertRaises(AirflowDagCycleException):
+            dag.test_cycle()
+
+        # test arbitrary loop
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # E-> A -> B -> F -> A
+        #       -> C -> F
+        with dag:
+            opA = DummyOperator(task_id='A')
+            opB = DummyOperator(task_id='B')
+            opC = DummyOperator(task_id='C')
+            opD = DummyOperator(task_id='D')
+            opE = DummyOperator(task_id='E')
+            opF = DummyOperator(task_id='F')
+            opA.set_downstream(opB)
+            opA.set_downstream(opC)
+            opE.set_downstream(opA)
+            opC.set_downstream(opF)
+            opB.set_downstream(opF)
+            opF.set_downstream(opA)
+
+        with self.assertRaises(AirflowDagCycleException):
+            dag.test_cycle()
+
 
 class DagStatTest(unittest.TestCase):
     def test_dagstats_crud(self):
@@ -796,7 +924,6 @@ class DagBagTest(unittest.TestCase):
         """
         test that we're able to parse file that contains multi-byte char
         """
-        from tempfile import NamedTemporaryFile
         f = NamedTemporaryFile()
         f.write('\u3042'.encode('utf8'))  # write multi-byte char (hiragana)
         f.flush()
@@ -857,6 +984,323 @@ class DagBagTest(unittest.TestCase):
             self.assertTrue(
                 dag.fileloc.endswith('airflow/example_dags/' + path))
 
+    def process_dag(self, create_dag):
+        """
+        Helper method to process a file generated from the input create_dag function.
+        """
+        # write source to file
+        source = textwrap.dedent(''.join(
+            inspect.getsource(create_dag).splitlines(True)[1:-1]))
+        f = NamedTemporaryFile()
+        f.write(source.encode('utf8'))
+        f.flush()
+
+        dagbag = models.DagBag(include_examples=False)
+        found_dags = dagbag.process_file(f.name)
+        return (dagbag, found_dags, f.name)
+
+    def validate_dags(self, expected_parent_dag, actual_found_dags, actual_dagbag,
+                      should_be_found=True):
+        expected_dag_ids = list(map(lambda dag: dag.dag_id, expected_parent_dag.subdags))
+        expected_dag_ids.append(expected_parent_dag.dag_id)
+
+        actual_found_dag_ids = list(map(lambda dag: dag.dag_id, actual_found_dags))
+
+        for dag_id in expected_dag_ids:
+            actual_dagbag.log.info('validating %s' % dag_id)
+            self.assertEquals(
+                dag_id in actual_found_dag_ids, should_be_found,
+                'dag "%s" should %shave been found after processing dag "%s"' %
+                (dag_id, '' if should_be_found else 'not ', expected_parent_dag.dag_id)
+            )
+            self.assertEquals(
+                dag_id in actual_dagbag.dags, should_be_found,
+                'dag "%s" should %sbe in dagbag.dags after processing dag "%s"' %
+                (dag_id, '' if should_be_found else 'not ', expected_parent_dag.dag_id)
+            )
+
+    def test_load_subdags(self):
+        # Define Dag to load
+        def standard_subdag():
+            from airflow.models import DAG
+            from airflow.operators.dummy_operator import DummyOperator
+            from airflow.operators.subdag_operator import SubDagOperator
+            import datetime
+            DAG_NAME = 'master'
+            DEFAULT_ARGS = {
+                'owner': 'owner1',
+                'start_date': datetime.datetime(2016, 1, 1)
+            }
+            dag = DAG(
+                DAG_NAME,
+                default_args=DEFAULT_ARGS)
+
+            # master:
+            #     A -> opSubDag_0
+            #          master.opsubdag_0:
+            #              -> subdag_0.task
+            #     A -> opSubDag_1
+            #          master.opsubdag_1:
+            #              -> subdag_1.task
+
+            with dag:
+                def subdag_0():
+                    subdag_0 = DAG('master.opSubdag_0', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_0.task', dag=subdag_0)
+                    return subdag_0
+
+                def subdag_1():
+                    subdag_1 = DAG('master.opSubdag_1', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_1.task', dag=subdag_1)
+                    return subdag_1
+
+                opSubdag_0 = SubDagOperator(
+                    task_id='opSubdag_0', dag=dag, subdag=subdag_0())
+                opSubdag_1 = SubDagOperator(
+                    task_id='opSubdag_1', dag=dag, subdag=subdag_1())
+
+                opA = DummyOperator(task_id='A')
+                opA.set_downstream(opSubdag_0)
+                opA.set_downstream(opSubdag_1)
+            return dag
+
+        testDag = standard_subdag()
+        # sanity check to make sure DAG.subdag is still functioning properly
+        self.assertEqual(len(testDag.subdags), 2)
+
+        # Perform processing dag
+        dagbag, found_dags, _ = self.process_dag(standard_subdag)
+
+        # Validate correctness
+        # all dags from testDag should be listed
+        self.validate_dags(testDag, found_dags, dagbag)
+
+        # Define Dag to load
+        def nested_subdags():
+            from airflow.models import DAG
+            from airflow.operators.dummy_operator import DummyOperator
+            from airflow.operators.subdag_operator import SubDagOperator
+            import datetime
+            DAG_NAME = 'master'
+            DEFAULT_ARGS = {
+                'owner': 'owner1',
+                'start_date': datetime.datetime(2016, 1, 1)
+            }
+            dag = DAG(
+                DAG_NAME,
+                default_args=DEFAULT_ARGS)
+
+            # master:
+            #     A -> opSubdag_0
+            #          master.opSubdag_0:
+            #              -> opSubDag_A
+            #                 master.opSubdag_0.opSubdag_A:
+            #                     -> subdag_A.task
+            #              -> opSubdag_B
+            #                 master.opSubdag_0.opSubdag_B:
+            #                     -> subdag_B.task
+            #     A -> opSubdag_1
+            #          master.opSubdag_1:
+            #              -> opSubdag_C
+            #                 master.opSubdag_1.opSubdag_C:
+            #                     -> subdag_C.task
+            #              -> opSubDag_D
+            #                 master.opSubdag_1.opSubdag_D:
+            #                     -> subdag_D.task
+
+            with dag:
+                def subdag_A():
+                    subdag_A = DAG(
+                        'master.opSubdag_0.opSubdag_A', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_A.task', dag=subdag_A)
+                    return subdag_A
+
+                def subdag_B():
+                    subdag_B = DAG(
+                        'master.opSubdag_0.opSubdag_B', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_B.task', dag=subdag_B)
+                    return subdag_B
+
+                def subdag_C():
+                    subdag_C = DAG(
+                        'master.opSubdag_1.opSubdag_C', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_C.task', dag=subdag_C)
+                    return subdag_C
+
+                def subdag_D():
+                    subdag_D = DAG(
+                        'master.opSubdag_1.opSubdag_D', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_D.task', dag=subdag_D)
+                    return subdag_D
+
+                def subdag_0():
+                    subdag_0 = DAG('master.opSubdag_0', default_args=DEFAULT_ARGS)
+                    SubDagOperator(task_id='opSubdag_A', dag=subdag_0, subdag=subdag_A())
+                    SubDagOperator(task_id='opSubdag_B', dag=subdag_0, subdag=subdag_B())
+                    return subdag_0
+
+                def subdag_1():
+                    subdag_1 = DAG('master.opSubdag_1', default_args=DEFAULT_ARGS)
+                    SubDagOperator(task_id='opSubdag_C', dag=subdag_1, subdag=subdag_C())
+                    SubDagOperator(task_id='opSubdag_D', dag=subdag_1, subdag=subdag_D())
+                    return subdag_1
+
+                opSubdag_0 = SubDagOperator(
+                    task_id='opSubdag_0', dag=dag, subdag=subdag_0())
+                opSubdag_1 = SubDagOperator(
+                    task_id='opSubdag_1', dag=dag, subdag=subdag_1())
+
+                opA = DummyOperator(task_id='A')
+                opA.set_downstream(opSubdag_0)
+                opA.set_downstream(opSubdag_1)
+
+            return dag
+
+        testDag = nested_subdags()
+        # sanity check to make sure DAG.subdag is still functioning properly
+        self.assertEqual(len(testDag.subdags), 6)
+
+        # Perform processing dag
+        dagbag, found_dags, _ = self.process_dag(nested_subdags)
+
+        # Validate correctness
+        # all dags from testDag should be listed
+        self.validate_dags(testDag, found_dags, dagbag)
+
+    def test_skip_cycle_dags(self):
+        """
+        Don't crash when loading an invalid (contains a cycle) DAG file.
+        Don't load the dag into the DagBag either
+        """
+        # Define Dag to load
+        def basic_cycle():
+            from airflow.models import DAG
+            from airflow.operators.dummy_operator import DummyOperator
+            import datetime
+            DAG_NAME = 'cycle_dag'
+            DEFAULT_ARGS = {
+                'owner': 'owner1',
+                'start_date': datetime.datetime(2016, 1, 1)
+            }
+            dag = DAG(
+                DAG_NAME,
+                default_args=DEFAULT_ARGS)
+
+            # A -> A
+            with dag:
+                opA = DummyOperator(task_id='A')
+                opA.set_downstream(opA)
+
+            return dag
+
+        testDag = basic_cycle()
+        # sanity check to make sure DAG.subdag is still functioning properly
+        self.assertEqual(len(testDag.subdags), 0)
+
+        # Perform processing dag
+        dagbag, found_dags, file_path = self.process_dag(basic_cycle)
+
+        # #Validate correctness
+        # None of the dags should be found
+        self.validate_dags(testDag, found_dags, dagbag, should_be_found=False)
+        self.assertIn(file_path, dagbag.import_errors)
+
+        # Define Dag to load
+        def nested_subdag_cycle():
+            from airflow.models import DAG
+            from airflow.operators.dummy_operator import DummyOperator
+            from airflow.operators.subdag_operator import SubDagOperator
+            import datetime
+            DAG_NAME = 'nested_cycle'
+            DEFAULT_ARGS = {
+                'owner': 'owner1',
+                'start_date': datetime.datetime(2016, 1, 1)
+            }
+            dag = DAG(
+                DAG_NAME,
+                default_args=DEFAULT_ARGS)
+
+            # cycle:
+            #     A -> opSubdag_0
+            #          cycle.opSubdag_0:
+            #              -> opSubDag_A
+            #                 cycle.opSubdag_0.opSubdag_A:
+            #                     -> subdag_A.task
+            #              -> opSubdag_B
+            #                 cycle.opSubdag_0.opSubdag_B:
+            #                     -> subdag_B.task
+            #     A -> opSubdag_1
+            #          cycle.opSubdag_1:
+            #              -> opSubdag_C
+            #                 cycle.opSubdag_1.opSubdag_C:
+            #                     -> subdag_C.task -> subdag_C.task  >Invalid Loop<
+            #              -> opSubDag_D
+            #                 cycle.opSubdag_1.opSubdag_D:
+            #                     -> subdag_D.task
+
+            with dag:
+                def subdag_A():
+                    subdag_A = DAG(
+                        'nested_cycle.opSubdag_0.opSubdag_A', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_A.task', dag=subdag_A)
+                    return subdag_A
+
+                def subdag_B():
+                    subdag_B = DAG(
+                        'nested_cycle.opSubdag_0.opSubdag_B', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_B.task', dag=subdag_B)
+                    return subdag_B
+
+                def subdag_C():
+                    subdag_C = DAG(
+                        'nested_cycle.opSubdag_1.opSubdag_C', default_args=DEFAULT_ARGS)
+                    opSubdag_C_task = DummyOperator(
+                        task_id='subdag_C.task', dag=subdag_C)
+                    # introduce a loop in opSubdag_C
+                    opSubdag_C_task.set_downstream(opSubdag_C_task)
+                    return subdag_C
+
+                def subdag_D():
+                    subdag_D = DAG(
+                        'nested_cycle.opSubdag_1.opSubdag_D', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_D.task', dag=subdag_D)
+                    return subdag_D
+
+                def subdag_0():
+                    subdag_0 = DAG('nested_cycle.opSubdag_0', default_args=DEFAULT_ARGS)
+                    SubDagOperator(task_id='opSubdag_A', dag=subdag_0, subdag=subdag_A())
+                    SubDagOperator(task_id='opSubdag_B', dag=subdag_0, subdag=subdag_B())
+                    return subdag_0
+
+                def subdag_1():
+                    subdag_1 = DAG('nested_cycle.opSubdag_1', default_args=DEFAULT_ARGS)
+                    SubDagOperator(task_id='opSubdag_C', dag=subdag_1, subdag=subdag_C())
+                    SubDagOperator(task_id='opSubdag_D', dag=subdag_1, subdag=subdag_D())
+                    return subdag_1
+
+                opSubdag_0 = SubDagOperator(
+                    task_id='opSubdag_0', dag=dag, subdag=subdag_0())
+                opSubdag_1 = SubDagOperator(
+                    task_id='opSubdag_1', dag=dag, subdag=subdag_1())
+
+                opA = DummyOperator(task_id='A')
+                opA.set_downstream(opSubdag_0)
+                opA.set_downstream(opSubdag_1)
+
+            return dag
+
+        testDag = nested_subdag_cycle()
+        # sanity check to make sure DAG.subdag is still functioning properly
+        self.assertEqual(len(testDag.subdags), 6)
+
+        # Perform processing dag
+        dagbag, found_dags, file_path = self.process_dag(nested_subdag_cycle)
+
+        # Validate correctness
+        # None of the dags should be found
+        self.validate_dags(testDag, found_dags, dagbag, should_be_found=False)
+        self.assertIn(file_path, dagbag.import_errors)
+
     def test_process_file_with_none(self):
         """
         test that process_file can handle Nones


Mime
View raw message