Repository: incubator-airflow
Updated Branches:
refs/heads/master 67b47c958 -> ea86895d5
[AIRFLOW-1420][AIRFLOW-1473] Fix deadlock check
Update the deadlock check to prevent false
positives on upstream
failure or skip conditions.
Closes #2506 from gwax/fix_dead_dagruns
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ea86895d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ea86895d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ea86895d
Branch: refs/heads/master
Commit: ea86895d5b81d6fed4f26c201f8874bacdd291e5
Parents: 67b47c9
Author: George Leslie-Waksman <george@cloverhealth.com>
Authored: Thu Aug 17 15:19:46 2017 -0700
Committer: Alex Guziel <alex.guziel@airbnb.com>
Committed: Thu Aug 17 15:19:52 2017 -0700
----------------------------------------------------------------------
airflow/models.py | 21 +++++++++------
airflow/ti_deps/deps/trigger_rule_dep.py | 4 +--
tests/models.py | 37 ++++++++++++++++++++++++---
3 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ea86895d/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 0b82c56..bf308e5 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4231,7 +4231,6 @@ class DagRun(Base):
ID_PREFIX = 'scheduled__'
ID_FORMAT_PREFIX = ID_PREFIX + '{0}'
- DEADLOCK_CHECK_DEP_CONTEXT = DepContext(ignore_in_retry_period=True)
id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN))
@@ -4457,13 +4456,19 @@ class DagRun(Base):
# small speed up
if unfinished_tasks and none_depends_on_past:
# todo: this can actually get pretty slow: one task costs between 0.01-015s
- no_dependencies_met = all(
- # Use a special dependency context that ignores task's up for retry
- # dependency, since a task that is up for retry is not necessarily
- # deadlocked.
- not t.are_dependencies_met(dep_context=self.DEADLOCK_CHECK_DEP_CONTEXT,
- session=session)
- for t in unfinished_tasks)
+ no_dependencies_met = True
+ for ut in unfinished_tasks:
+ # We need to flag upstream and check for changes because upstream
+ # failures can result in deadlock false positives
+ old_state = ut.state
+ deps_met = ut.are_dependencies_met(
+ dep_context=DepContext(
+ flag_upstream_failed=True,
+ ignore_in_retry_period=True),
+ session=session)
+ if deps_met or old_state != ut.current_state(session=session):
+ no_dependencies_met = False
+ break
duration = (datetime.now() - start_dttm).total_seconds() * 1000
Stats.timing("dagrun.dependency-check.{}.{}".
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ea86895d/airflow/ti_deps/deps/trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py
index cf06c0b..5a80314 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -124,8 +124,8 @@ class TriggerRuleDep(BaseTIDep):
tr = task.trigger_rule
upstream_done = done >= upstream
upstream_tasks_state = {
- "successes": successes, "skipped": skipped, "failed": failed,
- "upstream_failed": upstream_failed, "done": done
+ "total": upstream, "successes": successes, "skipped": skipped,
+ "failed": failed, "upstream_failed": upstream_failed, "done": done
}
# TODO(aoen): Ideally each individual trigger rules would be it's own class, but
# this isn't very feasible at the moment since the database queries need to be
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ea86895d/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 266e036..96275d3 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -37,6 +37,7 @@ from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
from airflow.utils.state import State
+from airflow.utils.trigger_rule import TriggerRule
from mock import patch
from parameterized import parameterized
@@ -483,10 +484,38 @@ class DagRunTest(unittest.TestCase):
state = dr.update_state()
self.assertEqual(State.SUCCESS, state)
- # upstream dependency failed, root has not run
- ti_op1.set_state(State.NONE, session)
- state = dr.update_state()
- self.assertEqual(State.FAILED, state)
+ def test_dagrun_deadlock(self):
+ session = settings.Session()
+ dag = DAG(
+ 'text_dagrun_deadlock',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ with dag:
+ op1 = DummyOperator(task_id='A')
+ op2 = DummyOperator(task_id='B')
+ op2.trigger_rule = TriggerRule.ONE_FAILED
+ op2.set_upstream(op1)
+
+ dag.clear()
+ now = datetime.datetime.now()
+ dr = dag.create_dagrun(run_id='test_dagrun_deadlock',
+ state=State.RUNNING,
+ execution_date=now,
+ start_date=now)
+
+ ti_op1 = dr.get_task_instance(task_id=op1.task_id)
+ ti_op1.set_state(state=State.SUCCESS, session=session)
+ ti_op2 = dr.get_task_instance(task_id=op2.task_id)
+ ti_op2.set_state(state=State.NONE, session=session)
+
+ dr.update_state()
+ self.assertEqual(dr.state, State.RUNNING)
+
+ ti_op2.set_state(state=State.NONE, session=session)
+ op2.trigger_rule = 'invalid'
+ dr.update_state()
+ self.assertEqual(dr.state, State.FAILED)
def test_get_task_instance_on_empty_dagrun(self):
"""
|