airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From saguz...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1334] Check if tasks are backfill on scheduler in a join
Date Wed, 21 Jun 2017 21:57:50 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/aguziel-use-join-apache [created] 436fe71c8


[AIRFLOW-1334] Check if tasks are backfill on scheduler in a join


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/436fe71c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/436fe71c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/436fe71c

Branch: refs/heads/aguziel-use-join-apache
Commit: 436fe71c89473e2da3b91e2d97166da06409a0f1
Parents: 9958aa9
Author: Alex Guziel <alex.guziel@airbnb.com>
Authored: Wed Jun 21 14:56:36 2017 -0700
Committer: Alex Guziel <alex.guziel@airbnb.com>
Committed: Wed Jun 21 14:57:43 2017 -0700

----------------------------------------------------------------------
 airflow/jobs.py | 11 ++++-------
 tests/jobs.py   | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 58 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/436fe71c/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 2b4350e..e16c277 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -35,8 +35,7 @@ import time
 from time import sleep
 
 import psutil
-from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_
-from sqlalchemy import update
+from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_, not_, update
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import make_transient
 from tabulate import tabulate
@@ -990,11 +989,14 @@ class SchedulerJob(BaseJob):
         # Get all the queued task instances from associated with scheduled
         # DagRuns.
         TI = models.TaskInstance
+        DR = models.DagRun
         task_instances_to_examine = (
             session
             .query(TI)
             .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
             .filter(TI.state.in_(states))
+            .outerjoin(DR, and_(DR.dag_id==TI.dag_id, DR.execution_date==TI.execution_date))
+            .filter(or_(DR.run_id == None, not_(DR.run_id.like('%backfill%'))))
             .all()
         )
 
@@ -1053,11 +1055,6 @@ class SchedulerJob(BaseJob):
                                      .format(task_instance, task_instance.dag_id))
                     continue
 
-                # todo: remove this logic when backfills will be part of the scheduler
-                dag_run = task_instance.get_dagrun()
-                if dag_run and dag_run.is_backfill:
-                    continue
-
                 # Check to make sure that the task concurrency of the DAG hasn't been
                 # reached.
                 dag_id = task_instance.dag_id

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/436fe71c/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 5c04b05..e051183 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -636,6 +636,60 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler.heartrate = 0
         scheduler.run()
 
+    def test_execute_task_instances_no_dagrun_task_will_execute(self):
+        """
+        Tests that tasks without dagrun still get executed.
+        """
+        dag_id = 'SchedulerJobTest.test_concurrency'
+        task_id_1 = 'dummy_task'
+
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
+        task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dagbag = SimpleDagBag([dag])
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+
+        dr1 = scheduler.create_dag_run(dag)
+        ti1 = TI(task1, DEFAULT_DATE)
+        ti1.state = State.SCHEDULED
+        ti1.execution_date = ti1.execution_date + datetime.timedelta(days=1)
+        session.merge(ti1)
+        session.commit()
+
+        scheduler._execute_task_instances(dagbag, [State.SCHEDULED])
+        ti1.refresh_from_db()
+        self.assertEquals(State.QUEUED, ti1.state)
+
+    def test_execute_task_instances_backfill_tasks_wont_execute(self):
+        """
+        Tests that backfill tasks won't get executed.
+        """
+        dag_id = 'SchedulerJobTest.test_concurrency'
+        task_id_1 = 'dummy_task'
+
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
+        task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dagbag = SimpleDagBag([dag])
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+
+        dr1 = scheduler.create_dag_run(dag)
+        dr1.run_id = 'blaH_backfill_blah'
+        ti1 = TI(task1, dr1.execution_date)
+        ti1.refresh_from_db()
+        ti1.state = State.SCHEDULED
+        session.merge(ti1)
+        session.merge(dr1)
+        session.commit()
+
+        self.assertTrue(dr1.is_backfill)
+
+        scheduler._execute_task_instances(dagbag, [State.SCHEDULED])
+        ti1.refresh_from_db()
+        self.assertEquals(State.SCHEDULED, ti1.state)
+
     def test_concurrency(self):
         dag_id = 'SchedulerJobTest.test_concurrency'
         task_id_1 = 'dummy_task'


Mime
View raw message