airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davy...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-316] Always check DB state for Backfill Job execution
Date Wed, 06 Jul 2016 21:44:46 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 92064398c -> 635c97a60


[AIRFLOW-316] Always check DB state for Backfill Job execution

Closes #1654 from aoen/ddavydov/dont_skip_db_state_check_for_subdag

Always check DB state and not just the local state for backfill jobs for
determining which task instances have not yet completed execution.
This is to avoid potential race conditions with e.g. two backfill jobs
running the same task instance.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/635c97a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/635c97a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/635c97a6

Branch: refs/heads/master
Commit: 635c97a60636c223d374452258465fdc337f0913
Parents: 9206439
Author: Dan Davydov <dan.davydov@airbnb.com>
Authored: Wed Jul 6 14:44:21 2016 -0700
Committer: Dan Davydov <dan.davydov@airbnb.com>
Committed: Wed Jul 6 14:44:28 2016 -0700

----------------------------------------------------------------------
 airflow/jobs.py | 21 ++++++++++-----------
 1 file changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/635c97a6/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index b3270b0..a0749c0 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -896,17 +896,16 @@ class BackfillJob(BaseJob):
                     ti.execution_date == (start_date or ti.start_date))
                 # The task was already marked successful or skipped by a
                 # different Job. Don't rerun it.
-                if key not in started:
-                    if ti.state == State.SUCCESS:
-                        succeeded.add(key)
-                        tasks_to_run.pop(key)
-                        session.commit()
-                        continue
-                    elif ti.state == State.SKIPPED:
-                        skipped.add(key)
-                        tasks_to_run.pop(key)
-                        session.commit()
-                        continue
+                if ti.state == State.SUCCESS:
+                    succeeded.add(key)
+                    tasks_to_run.pop(key)
+                    session.commit()
+                    continue
+                elif ti.state == State.SKIPPED:
+                    skipped.add(key)
+                    tasks_to_run.pop(key)
+                    session.commit()
+                    continue
 
                 # Is the task runnable? -- then run it
                 if ti.is_queueable(


Mime
View raw message