airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/2] incubator-airflow git commit: [AIRFLOW-214] Fix occasion of detached taskinstance
Date Tue, 07 Jun 2016 06:44:59 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 18009d033 -> 03ce4b9c0


[AIRFLOW-214] Fix occasion of detached taskinstance

For some reason occasionely taskinstanced could become
detached from the database session. Now it uses a fresh session
to ensure the taskinstances stay attached.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1e48c2b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1e48c2b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1e48c2b9

Branch: refs/heads/master
Commit: 1e48c2b914375feaf7b8a3204cc29364a2c0cd02
Parents: 89edb6f
Author: Bolke de Bruin <bolke@xs4all.nl>
Authored: Mon Jun 6 17:52:58 2016 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Mon Jun 6 21:30:57 2016 +0200

----------------------------------------------------------------------
 airflow/jobs.py | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1e48c2b9/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 005871f..5aaab3b 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -503,7 +503,7 @@ class SchedulerJob(BaseJob):
         session.commit()
 
         # update the state of the previously active dag runs
-        dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING)
+        dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)
         active_dag_runs = []
         for run in dag_runs:
             # do not consider runs that are executed in the future
@@ -513,14 +513,15 @@ class SchedulerJob(BaseJob):
             # todo: run.task is transient but needs to be set
             run.dag = dag
             # todo: preferably the integrity check happens at dag collection time
-            run.verify_integrity()
-            run.update_state()
+            run.verify_integrity(session=session)
+            run.update_state(session=session)
             if run.state == State.RUNNING:
                 active_dag_runs.append(run)
 
         for run in active_dag_runs:
-            tis = run.get_task_instances(session=session, state=(State.NONE,
-                                                                 State.UP_FOR_RETRY))
+            # this needs a fresh session sometimes tis get detached
+            tis = run.get_task_instances(state=(State.NONE,
+                                                State.UP_FOR_RETRY))
 
             # this loop is quite slow as it uses are_dependencies_met for
             # every task (in ti.is_runnable). This is also called in


Mime
View raw message