airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1124] Do not set all tasks to scheduled in backfill
Date Wed, 19 Apr 2017 15:15:53 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6684597d9 -> 0406462dc


[AIRFLOW-1124] Do not set all tasks to scheduled in backfill

Backfill is supposed to fill in the blanks and not
to reschedule
all tasks. This fixes a regression from 1.8.0.

Closes #2247 from bolkedebruin/AIRFLOW-1124


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0406462d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0406462d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0406462d

Branch: refs/heads/master
Commit: 0406462dc91427793ba40d0f05f321e85dbc6f19
Parents: 6684597
Author: Bolke de Bruin <bolke@xs4all.nl>
Authored: Wed Apr 19 17:15:46 2017 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Wed Apr 19 17:15:46 2017 +0200

----------------------------------------------------------------------
 airflow/jobs.py |  3 ++-
 tests/jobs.py   | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 71 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0406462d/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 95f4ad7..b5d68b0 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1821,7 +1821,8 @@ class BackfillJob(BaseJob):
 
             for ti in run.get_task_instances():
                 # all tasks part of the backfill are scheduled to run
-                ti.set_state(State.SCHEDULED, session=session)
+                if ti.state == State.NONE:
+                    ti.set_state(State.SCHEDULED, session=session)
                 tasks_to_run[ti.key] = ti
 
             next_run_date = self.dag.following_schedule(next_run_date)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0406462d/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index e99778a..26a105b 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -348,6 +348,75 @@ class BackfillJobTest(unittest.TestCase):
             else:
                 self.assertEqual(State.NONE, ti.state)
 
+    def test_backfill_fill_blanks(self):
+        dag = DAG(
+            'test_backfill_fill_blanks',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'},
+        )
+
+        with dag:
+            op1 = DummyOperator(task_id='op1')
+            op2 = DummyOperator(task_id='op2')
+            op3 = DummyOperator(task_id='op3')
+            op4 = DummyOperator(task_id='op4')
+            op5 = DummyOperator(task_id='op5')
+            op6 = DummyOperator(task_id='op6')
+
+        dag.clear()
+        dr = dag.create_dagrun(run_id='test',
+                               state=State.SUCCESS,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE)
+        executor = TestExecutor(do_update=True)
+
+        session = settings.Session()
+
+        tis = dr.get_task_instances()
+        for ti in tis:
+            if ti.task_id == op1.task_id:
+                ti.state = State.UP_FOR_RETRY
+                ti.end_date = DEFAULT_DATE
+            elif ti.task_id == op2.task_id:
+                ti.state = State.FAILED
+            elif ti.task_id == op3.task_id:
+                ti.state = State.SKIPPED
+            elif ti.task_id == op4.task_id:
+                ti.state = State.SCHEDULED
+            elif ti.task_id == op5.task_id:
+                ti.state = State.UPSTREAM_FAILED
+            # op6 = None
+            session.merge(ti)
+        session.commit()
+        session.close()
+
+        job = BackfillJob(dag=dag,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE,
+                          executor=executor)
+        self.assertRaisesRegexp(
+            AirflowException,
+            'Some task instances failed',
+            job.run)
+
+        self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db)
+        # the run_id should have changed, so a refresh won't work
+        drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE)
+        dr = drs[0]
+
+        self.assertEqual(dr.state, State.FAILED)
+
+        tis = dr.get_task_instances()
+        for ti in tis:
+            if ti.task_id in (op1.task_id, op4.task_id, op6.task_id):
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == op2.task_id:
+                self.assertEqual(ti.state, State.FAILED)
+            elif ti.task_id == op3.task_id:
+                self.assertEqual(ti.state, State.SKIPPED)
+            elif ti.task_id == op5.task_id:
+                self.assertEqual(ti.state, State.UPSTREAM_FAILED)
+
     def test_backfill_execute_subdag(self):
         dag = self.dagbag.get_dag('example_subdag_operator')
         subdag_op_task = dag.get_task('section-1')


Mime
View raw message