airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [airflow] mik-laj commented on a change in pull request #6954: [AIRFLOW-4355] removed task should not lead to dagrun success
Date Tue, 07 Jan 2020 00:21:24 GMT
mik-laj commented on a change in pull request #6954: [AIRFLOW-4355] removed task should not
lead to dagrun success
URL: https://github.com/apache/airflow/pull/6954#discussion_r363543585
 
 

 ##########
 File path: airflow/models/dagrun.py
 ##########
 @@ -311,32 +313,42 @@ def update_state(self, session=None):
 
         leaf_tis = [ti for ti in tis if ti.task_id in {t.task_id for t in dag.leaves}]
 
-        # if all roots finished and at least one failed, the run failed
-        if not unfinished_tasks and any(
-            leaf_ti.state in {State.FAILED, State.UPSTREAM_FAILED} for leaf_ti in leaf_tis
-        ):
-            self.log.info('Marking run %s failed', self)
-            self.set_state(State.FAILED)
-            dag.handle_callback(self, success=False, reason='task_failure',
-                                session=session)
-
-        # if all leafs succeeded and no unfinished tasks, the run succeeded
-        elif not unfinished_tasks and all(
-            leaf_ti.state in {State.SUCCESS, State.SKIPPED} for leaf_ti in leaf_tis
-        ):
-            self.log.info('Marking run %s successful', self)
-            self.set_state(State.SUCCESS)
-            dag.handle_callback(self, success=True, reason='success', session=session)
-
-        # if *all tasks* are deadlocked, the run failed
-        elif (unfinished_tasks and none_depends_on_past and
-              none_task_concurrency and no_dependencies_met):
-            self.log.info('Deadlock; marking run %s failed', self)
-            self.set_state(State.FAILED)
-            dag.handle_callback(self, success=False, reason='all_tasks_deadlocked',
-                                session=session)
-
-        # finally, if the roots aren't done, the dag is still running
+        if conf.getboolean('scheduler', 'REMOVED_TASKS_LEAD_TO_DAGRUN_FAILURE', fallback=False):
+            # REMOVED state counted as unfinished
+            unfinished_tasks = self.get_task_instances(
+                state=State.unfinished_or_removed(),
+                session=session
+            )
+
+        if len(tis) == len(dag.active_tasks):
 
 Review comment:
   ```suggestion
           if len(tis) == len(dag.tasks):
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message