airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From saguz...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1420][AIRFLOW-1473] Fix deadlock check
Date Thu, 17 Aug 2017 22:20:27 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 67b47c958 -> ea86895d5


[AIRFLOW-1420][AIRFLOW-1473] Fix deadlock check

Update the deadlock check to prevent false
positives on upstream
failure or skip conditions.

Closes #2506 from gwax/fix_dead_dagruns


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ea86895d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ea86895d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ea86895d

Branch: refs/heads/master
Commit: ea86895d5b81d6fed4f26c201f8874bacdd291e5
Parents: 67b47c9
Author: George Leslie-Waksman <george@cloverhealth.com>
Authored: Thu Aug 17 15:19:46 2017 -0700
Committer: Alex Guziel <alex.guziel@airbnb.com>
Committed: Thu Aug 17 15:19:52 2017 -0700

----------------------------------------------------------------------
 airflow/models.py                        | 21 +++++++++------
 airflow/ti_deps/deps/trigger_rule_dep.py |  4 +--
 tests/models.py                          | 37 ++++++++++++++++++++++++---
 3 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ea86895d/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 0b82c56..bf308e5 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4231,7 +4231,6 @@ class DagRun(Base):
 
     ID_PREFIX = 'scheduled__'
     ID_FORMAT_PREFIX = ID_PREFIX + '{0}'
-    DEADLOCK_CHECK_DEP_CONTEXT = DepContext(ignore_in_retry_period=True)
 
     id = Column(Integer, primary_key=True)
     dag_id = Column(String(ID_LEN))
@@ -4457,13 +4456,19 @@ class DagRun(Base):
         # small speed up
         if unfinished_tasks and none_depends_on_past:
             # todo: this can actually get pretty slow: one task costs between 0.01-015s
-            no_dependencies_met = all(
-                # Use a special dependency context that ignores task's up for retry
-                # dependency, since a task that is up for retry is not necessarily
-                # deadlocked.
-                not t.are_dependencies_met(dep_context=self.DEADLOCK_CHECK_DEP_CONTEXT,
-                                           session=session)
-                for t in unfinished_tasks)
+            no_dependencies_met = True
+            for ut in unfinished_tasks:
+                # We need to flag upstream and check for changes because upstream
+                # failures can result in deadlock false positives
+                old_state = ut.state
+                deps_met = ut.are_dependencies_met(
+                    dep_context=DepContext(
+                        flag_upstream_failed=True,
+                        ignore_in_retry_period=True),
+                    session=session)
+                if deps_met or old_state != ut.current_state(session=session):
+                    no_dependencies_met = False
+                    break
 
         duration = (datetime.now() - start_dttm).total_seconds() * 1000
         Stats.timing("dagrun.dependency-check.{}.{}".

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ea86895d/airflow/ti_deps/deps/trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py
index cf06c0b..5a80314 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -124,8 +124,8 @@ class TriggerRuleDep(BaseTIDep):
         tr = task.trigger_rule
         upstream_done = done >= upstream
         upstream_tasks_state = {
-            "successes": successes, "skipped": skipped, "failed": failed,
-            "upstream_failed": upstream_failed, "done": done
+            "total": upstream, "successes": successes, "skipped": skipped,
+            "failed": failed, "upstream_failed": upstream_failed, "done": done
         }
         # TODO(aoen): Ideally each individual trigger rules would be it's own class, but
         # this isn't very feasible at the moment since the database queries need to be

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ea86895d/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 266e036..96275d3 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -37,6 +37,7 @@ from airflow.operators.python_operator import PythonOperator
 from airflow.operators.python_operator import ShortCircuitOperator
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.utils.state import State
+from airflow.utils.trigger_rule import TriggerRule
 from mock import patch
 from parameterized import parameterized
 
@@ -483,10 +484,38 @@ class DagRunTest(unittest.TestCase):
         state = dr.update_state()
         self.assertEqual(State.SUCCESS, state)
 
-        # upstream dependency failed, root has not run
-        ti_op1.set_state(State.NONE, session)
-        state = dr.update_state()
-        self.assertEqual(State.FAILED, state)
+    def test_dagrun_deadlock(self):
+        session = settings.Session()
+        dag = DAG(
+            'text_dagrun_deadlock',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = DummyOperator(task_id='A')
+            op2 = DummyOperator(task_id='B')
+            op2.trigger_rule = TriggerRule.ONE_FAILED
+            op2.set_upstream(op1)
+
+        dag.clear()
+        now = datetime.datetime.now()
+        dr = dag.create_dagrun(run_id='test_dagrun_deadlock',
+                               state=State.RUNNING,
+                               execution_date=now,
+                               start_date=now)
+
+        ti_op1 = dr.get_task_instance(task_id=op1.task_id)
+        ti_op1.set_state(state=State.SUCCESS, session=session)
+        ti_op2 = dr.get_task_instance(task_id=op2.task_id)
+        ti_op2.set_state(state=State.NONE, session=session)
+
+        dr.update_state()
+        self.assertEqual(dr.state, State.RUNNING)
+
+        ti_op2.set_state(state=State.NONE, session=session)
+        op2.trigger_rule = 'invalid'
+        dr.update_state()
+        self.assertEqual(dr.state, State.FAILED)
 
     def test_get_task_instance_on_empty_dagrun(self):
         """


Mime
View raw message