airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davy...@apache.org
Subject [5/5] incubator-airflow git commit: Add logic to lock DB and avoid race condition
Date Mon, 09 May 2016 23:52:40 GMT
Add logic to lock DB and avoid race condition

The scheduler can encounter a queued task twice before the
task actually starts to run -- this locks the task and avoids
that condition.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/77c7bc4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/77c7bc4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/77c7bc4a

Branch: refs/heads/airbnb_rb1.7.1_3
Commit: 77c7bc4ac11513b63fd82d9dd9b2a98e13ff06e0
Parents: edc718b
Author: jlowin <jlowin@users.noreply.github.com>
Authored: Fri May 6 15:05:33 2016 -0400
Committer: Dan Davydov <dan.davydov@airbnb.com>
Committed: Mon May 9 16:26:56 2016 -0700

----------------------------------------------------------------------
 airflow/models.py | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77c7bc4a/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index c3a01b0..58d6fd8 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -775,16 +775,25 @@ class TaskInstance(Base):
         session.commit()
 
     @provide_session
-    def refresh_from_db(self, session=None):
+    def refresh_from_db(self, session=None, lock_for_update=False):
         """
         Refreshes the task instance from the database based on the primary key
+
+        :param lock_for_update: if True, indicates that the database should
+        lock the TaskInstance (issuing a FOR UPDATE clause) until the session
+        is committed.
         """
         TI = TaskInstance
-        ti = session.query(TI).filter(
+
+        qry = session.query(TI).filter(
             TI.dag_id == self.dag_id,
             TI.task_id == self.task_id,
-            TI.execution_date == self.execution_date,
-        ).first()
+            TI.execution_date == self.execution_date)
+
+        if lock_for_update:
+            ti = qry.with_for_update().first()
+        else:
+            ti = qry.first()
         if ti:
             self.state = ti.state
             self.start_date = ti.start_date
@@ -1117,7 +1126,7 @@ class TaskInstance(Base):
         self.pool = pool or task.pool
         self.test_mode = test_mode
         self.force = force
-        self.refresh_from_db()
+        self.refresh_from_db(session=session, lock_for_update=True)
         self.clear_xcom_data()
         self.job_id = job_id
         iso = datetime.now().isoformat()


Mime
View raw message