airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From san...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-78] airflow clear leaves dag_runs
Date Wed, 10 Aug 2016 02:38:42 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master f1abffa38 -> 197c9050e


[AIRFLOW-78] airflow clear leaves dag_runs

Fix a bug in the scheduler where dag runs cleared via CLI would be picked up without checking
max_active_dag_runs first, resulting in too many simultaneous dag runs.

Dear Airflow Maintainers,

Please accept this PR that addresses the following issues:
- https://issues.apache.org/jira/browse/AIRFLOW-78

Testing Done:
- Expanded the jobs.test_scheduler_verify_max_active_runs test to test if scheduler respects
max_active_dag_runs

Fix a bug in the scheduler where dag runs cleared via CLI would be picked up without checking
max_active_dag_runs first, resulting in too many simultaneous dag runs.

Closes #1716 from normster/clear_dagrun


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/197c9050
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/197c9050
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/197c9050

Branch: refs/heads/master
Commit: 197c9050ef3a142c18aa97819da48ee8cadbf8d8
Parents: f1abffa
Author: Norman Mu <norman@agari.com>
Authored: Tue Aug 9 19:38:23 2016 -0700
Committer: Siddharth Anand <siddharthanand@yahoo.com>
Committed: Tue Aug 9 19:38:23 2016 -0700

----------------------------------------------------------------------
 airflow/jobs.py   |  4 ++++
 airflow/models.py |  9 +++++++++
 tests/jobs.py     | 18 ++++++++++++++----
 3 files changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/197c9050/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index c07c411..9580435 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1057,6 +1057,10 @@ class SchedulerJob(BaseJob):
         """
         for dag in dags:
             dag = dagbag.get_dag(dag.dag_id)
+            if dag.reached_max_runs:
+                self.logger.info("Not processing DAG {} since its max runs has been reached"
+                                .format(dag.dag_id))
+                continue
             if dag.is_paused:
                 self.logger.info("Not processing DAG {} since it's paused"
                                  .format(dag.dag_id))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/197c9050/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 182f7cc..4ccf2a1 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2879,6 +2879,15 @@ class DAG(BaseDag, LoggingMixin):
                 l += task.subdag.subdags
         return l
 
+    @property
+    def reached_max_runs(self):
+        active_runs = DagRun.find(
+            dag_id=self.dag_id,
+            state=State.RUNNING,
+            external_trigger=False
+        )
+        return len(active_runs) >= self.max_active_runs
+
     def resolve_template_files(self):
         for t in self.tasks:
             t.resolve_template_files()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/197c9050/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index e86b9da..351268a 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -20,6 +20,7 @@ from __future__ import unicode_literals
 import datetime
 import logging
 import os
+import time
 import unittest
 
 from airflow import AirflowException, settings
@@ -609,14 +610,23 @@ class SchedulerJobTest(unittest.TestCase):
         session.commit()
         session.close()
 
-        scheduler = SchedulerJob()
-        dag.clear()
+        scheduler = SchedulerJob(dag.dag_id,
+                                run_duration=1)
 
         dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
 
-        dr = scheduler.create_dag_run(dag)
-        self.assertIsNone(dr)
+        dr2 = scheduler.create_dag_run(dag)
+        self.assertIsNone(dr2)
+
+        dag.clear()
+
+        dag.max_active_runs = 0
+        scheduler.run()
+
+        session = settings.Session()
+        self.assertEqual(
+            len(session.query(TI).filter(TI.dag_id == dag.dag_id).all()), 0)
 
     def test_scheduler_fail_dagrun_timeout(self):
         """


Mime
View raw message