airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davy...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-897] Prevent dagruns from failing with unfinished tasks
Date Fri, 24 Feb 2017 22:29:21 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 2bceeed80 -> ffec38109


[AIRFLOW-897] Prevent dagruns from failing with unfinished tasks

Closes #2099 from
aoen/ddavydov/fix_premature_dagrun_failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ffec3810
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ffec3810
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ffec3810

Branch: refs/heads/master
Commit: ffec381099cc0852b0f01237103abe5124205a3b
Parents: 2bceeed
Author: Dan Davydov <dan.davydov@airbnb.com>
Authored: Fri Feb 24 14:29:11 2017 -0800
Committer: Dan Davydov <dan.davydov@airbnb.com>
Committed: Fri Feb 24 14:29:14 2017 -0800

----------------------------------------------------------------------
 airflow/models.py             |  6 +++---
 tests/dags/test_issue_1225.py | 13 +++++++++++++
 tests/jobs.py                 | 24 ++++++++++++++++++++++++
 3 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ffec3810/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index db9112a..37f8823 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4015,12 +4015,12 @@ class DagRun(Base):
 
         # future: remove the check on adhoc tasks (=active_tasks)
         if len(tis) == len(dag.active_tasks):
-            # if any roots failed, the run failed
             root_ids = [t.task_id for t in dag.roots]
             roots = [t for t in tis if t.task_id in root_ids]
 
-            if any(r.state in (State.FAILED, State.UPSTREAM_FAILED)
-                   for r in roots):
+            # if all roots finished and at least on failed, the run failed
+            if (not unfinished_tasks and
+                    any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
                 logging.info('Marking run {} failed'.format(self))
                 self.state = State.FAILED
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ffec3810/tests/dags/test_issue_1225.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py
index 021561f..d01fd79 100644
--- a/tests/dags/test_issue_1225.py
+++ b/tests/dags/test_issue_1225.py
@@ -129,3 +129,16 @@ dag7_subdag1 = SubDagOperator(
     subdag=subdag7)
 subdag7_task1.set_downstream(subdag7_task2)
 subdag7_task2.set_downstream(subdag7_task3)
+
+# DAG tests that a Dag run that doesn't complete but has a root failure is marked running
+dag8 = DAG(dag_id='test_dagrun_states_root_fail_unfinished', default_args=default_args)
+dag8_task1 = DummyOperator(
+    task_id='test_dagrun_unfinished',  # The test will unset the task instance state after
+                                       # running this test
+    dag=dag8,
+)
+dag8_task2 = PythonOperator(
+    task_id='test_dagrun_fail',
+    dag=dag8,
+    python_callable=fail,
+)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ffec3810/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 71470e3..d5fdc26 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -358,6 +358,30 @@ class SchedulerJobTest(unittest.TestCase):
             },
             dagrun_state=State.FAILED)
 
+    def test_dagrun_root_fail_unfinished(self):
+        """
+        DagRuns with one unfinished and one failed root task -> RUNNING
+        """
+        # Run both the failed and successful tasks
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        dag_id = 'test_dagrun_states_root_fail_unfinished'
+        dag = self.dagbag.get_dag(dag_id)
+        dag.clear()
+        dr = scheduler.create_dag_run(dag)
+        try:
+            dag.run(start_date=dr.execution_date, end_date=dr.execution_date)
+        except AirflowException:  # Expect an exception since there is a failed task
+            pass
+
+        # Mark the successful task as never having run since we want to see if the
+        # dagrun will be in a running state despite haveing an unfinished task.
+        session = settings.Session()
+        ti = dr.get_task_instance('test_dagrun_unfinished', session=session)
+        ti.state = State.NONE
+        session.commit()
+        dr_state = dr.update_state()
+        self.assertEqual(dr_state, State.RUNNING)
+
     def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):
         """
         DagRun is marked a success if ignore_first_depends_on_past=True


Mime
View raw message