airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/3] incubator-airflow git commit: [AIRFLOW-695] Retries do not execute because dagrun is in FAILED state
Date Tue, 03 Jan 2017 13:28:33 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master e9fe64af0 -> f939d7832


[AIRFLOW-695] Retries do not execute because dagrun is in FAILED state

The scheduler checks the tasks instances without taking into account
if the executor already reported back. In this case the executor
reports back several iterations later, but the task is queued nevertheless.
Due to the fact tasks will not enter the queue when the task is considered
running, the task state will be "queued” indefinitely and in limbo
between the scheduler and the executor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c8a4eb34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c8a4eb34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c8a4eb34

Branch: refs/heads/master
Commit: c8a4eb34550749464759c6789c8d735d82dfdeb7
Parents: 937142d
Author: root <bolke@xs4all.nl>
Authored: Sun Dec 18 20:19:58 2016 +0000
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Mon Jan 2 14:18:22 2017 +0100

----------------------------------------------------------------------
 airflow/executors/base_executor.py |  9 ++++
 airflow/jobs.py                    |  5 +++
 tests/jobs.py                      | 76 +++++++++++++++++++++++++++++++++
 3 files changed, 90 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c8a4eb34/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index d702ff2..7a4065e 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -76,6 +76,15 @@ class BaseExecutor(LoggingMixin):
             priority=task_instance.task.priority_weight_total,
             queue=task_instance.task.queue)
 
+    def has_task(self, task_instance):
+        """
+        Checks if a task is either queued or running in this executor
+        :param task_instance: TaskInstance
+        :return: True if the task is known to this executor
+        """
+        if task_instance.key in self.queued_tasks or task_instance.key in self.running:
+            return True
+
     def sync(self):
         """
         Sync will get called periodically by the heartbeat method.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c8a4eb34/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 81c77a8..819d107 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -989,6 +989,11 @@ class SchedulerJob(BaseJob):
                     # Can't schedule any more since there are no more open slots.
                     break
 
+                if self.executor.has_task(task_instance):
+                    self.logger.debug("Not handling task {} as the executor reports it is
running"
+                                      .format(task_instance.key))
+                    continue
+ 
                 if simple_dag_bag.get_dag(task_instance.dag_id).is_paused:
                     self.logger.info("Not executing queued {} since {} is paused"
                                      .format(task_instance, task_instance.dag_id))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c8a4eb34/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 62e88e5..d7dfbe7 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -21,6 +21,7 @@ import datetime
 import logging
 import os
 import unittest
+import six
 
 from airflow import AirflowException, settings
 from airflow import models
@@ -29,6 +30,7 @@ from airflow.executors import DEFAULT_EXECUTOR
 from airflow.jobs import BackfillJob, SchedulerJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.bash_operator import BashOperator
 from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
@@ -899,6 +901,80 @@ class SchedulerJobTest(unittest.TestCase):
         do_schedule()
         self.assertEquals(2, len(executor.queued_tasks))
 
+    def test_retry_still_in_executor(self):
+        """
+        Checks if the scheduler does not put a task in limbo, when a task is retried
+        but is still present in the executor.
+        """
+        executor = TestExecutor()
+        dagbag = DagBag(executor=executor)
+        dagbag.dags.clear()
+        dagbag.executor = executor
+
+        dag = DAG(
+            dag_id='test_retry_still_in_executor',
+            start_date=DEFAULT_DATE)
+        dag_task1 = BashOperator(
+            task_id='test_retry_handling_op',
+            bash_command='exit 1',
+            retries=1,
+            dag=dag,
+            owner='airflow')
+
+        dag.clear()
+        dag.is_subdag = False
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        orm_dag.is_paused = False
+        session.merge(orm_dag)
+        session.commit()
+
+        dagbag.bag_dag(dag=dag, root_dag=dag, parent_dag=dag)
+
+        @mock.patch('airflow.models.DagBag', return_value=dagbag)
+        @mock.patch('airflow.models.DagBag.collect_dags')
+        def do_schedule(function, function2):
+            # Use a empty file since the above mock will return the
+            # expected DAGs. Also specify only a single file so that it doesn't
+            # try to schedule the above DAG repeatedly.
+            scheduler = SchedulerJob(num_runs=1,
+                                     executor=executor,
+                                     subdir=os.path.join(models.DAGS_FOLDER,
+                                                         "no_dags.py"))
+            scheduler.heartrate = 0
+            scheduler.run()
+
+        do_schedule()
+        self.assertEquals(1, len(executor.queued_tasks))
+
+        def run_with_error(task):
+            try:
+                task.run()
+            except AirflowException:
+                pass
+
+        ti_tuple = six.next(six.itervalues(executor.queued_tasks))
+        (command, priority, queue, ti) = ti_tuple
+        ti.task = dag_task1
+
+        # fail execution
+        run_with_error(ti)
+        self.assertEqual(ti.state, State.UP_FOR_RETRY)
+        self.assertEqual(ti.try_number, 1)
+
+        # do not schedule
+        do_schedule()
+        self.assertTrue(executor.has_task(ti))
+        ti.refresh_from_db()
+        self.assertEqual(ti.state, State.UP_FOR_RETRY)
+
+        # now the executor has cleared and it should be allowed the re-queue
+        executor.queued_tasks.clear()
+        do_schedule()
+        ti.refresh_from_db()
+        self.assertEqual(ti.state, State.QUEUED)
+
     def test_scheduler_run_duration(self):
         """
         Verifies that the scheduler run duration limit is followed.


Mime
View raw message