airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-558] Add Support for dag.catchup=(True|False) Option
Date Fri, 13 Jan 2017 11:40:07 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master e0f5c0cb8 -> 1caaceb38


[AIRFLOW-558] Add Support for dag.catchup=(True|False) Option

Added a dag.catchup option and modified the
scheduler to look at the value when scheduling
DagRuns
(by moving dag.start_date up to
dag.previous_schedule),
and added a config option catchup_by_default
(defaults to True) that allows users to set this
to False for all
dags modifying the existing DAGs

In addition, we added a test to jobs.py
(test_dag_catchup_option)

Closes #1830 from
btallman/NoBackfill_clean_feature


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1caaceb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1caaceb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1caaceb3

Branch: refs/heads/master
Commit: 1caaceb38803265fc7aba21219e2df6dc837d804
Parents: e0f5c0c
Author: Benjamin Tallman <btallman@gmail.com>
Authored: Fri Jan 13 12:39:55 2017 +0100
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Fri Jan 13 12:39:55 2017 +0100

----------------------------------------------------------------------
 airflow/configuration.py                |  10 +++
 airflow/jobs.py                         |  23 ++++++
 airflow/models.py                       |  55 +++++++++++++--
 airflow/ti_deps/deps/prev_dagrun_dep.py |  20 ++++--
 docs/scheduler.rst                      |  52 ++++++++++++++
 tests/jobs.py                           | 102 +++++++++++++++++++++++++++
 6 files changed, 252 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1caaceb3/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 1f2eafa..6778464 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -356,6 +356,14 @@ child_process_log_directory = /tmp/airflow/scheduler/logs
 # associated task instance as failed and will re-schedule the task.
 scheduler_zombie_task_threshold = 300
 
+# Turn off scheduler catchup by setting this to False.
+# Default behavior is unchanged and
+# Command Line Backfills still work, but the scheduler
+# will not do scheduler catchup if this is False,
+# however it can be set on a per DAG basis in the
+# DAG definition (catchup)
+catchup_by_default = True
+
 # Statsd (https://github.com/etsy/statsd) integration settings
 statsd_on = False
 statsd_host = localhost
@@ -486,6 +494,8 @@ job_heartbeat_sec = 1
 scheduler_heartbeat_sec = 5
 authenticate = true
 max_threads = 2
+catchup_by_default = True
+scheduler_zombie_task_threshold = 300
 """
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1caaceb3/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 819d107..89d0502 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -729,6 +729,25 @@ class SchedulerJob(BaseJob):
             if dag.schedule_interval == '@once' and last_scheduled_run:
                 return None
 
+            # don't do scheduler catchup for dag's that don't have dag.catchup = True
+            if not dag.catchup:
+                # The logic is that we move start_date up until
+                # one period before, so that datetime.now() is AFTER
+                # the period end, and the job can be created...
+                now = datetime.now()
+                next_start = dag.following_schedule(now)
+                last_start = dag.previous_schedule(now)
+                if next_start <= now:
+                    new_start = last_start
+                else:
+                    new_start = dag.previous_schedule(last_start)
+
+                if dag.start_date:
+                    if new_start >= dag.start_date:
+                        dag.start_date = new_start
+                else:
+                    dag.start_date = new_start
+
             next_run_date = None
             if not last_scheduled_run:
                 # First run
@@ -756,6 +775,10 @@ class SchedulerJob(BaseJob):
                 self.logger.debug("Dag start date: {}. Next run date: {}"
                                   .format(dag.start_date, next_run_date))
 
+            # don't ever schedule in the future
+            if next_run_date > datetime.now():
+                return
+
             # this structure is necessary to avoid a TypeError from concatenating
             # NoneType
             if dag.schedule_interval == '@once':

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1caaceb3/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 0bd744e..c53368f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1023,12 +1023,28 @@ class TaskInstance(Base):
     @provide_session
     def previous_ti(self, session=None):
         """ The task instance for the task that ran before this task instance """
-        return session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.task_id == self.task.task_id,
-            TaskInstance.execution_date ==
-            self.task.dag.previous_schedule(self.execution_date),
-        ).first()
+
+        dag = self.task.dag
+        if dag:
+            dr = self.get_dagrun(session=session)
+            if not dr:
+                # Means that this TI is NOT being run from a DR, but from a catchup
+                previous_scheduled_date = dag.previous_schedule(self.execution_date)
+                if not previous_scheduled_date:
+                    return None
+                else:
+                    return TaskInstance(task=self.task, execution_date=previous_scheduled_date)
+
+            if dag.catchup:
+                last_dagrun = dr.get_previous_scheduled_dagrun(session=session) if dr else
None
+
+            else:
+                last_dagrun = dr.get_previous_dagrun(session=session) if dr else None
+
+            if last_dagrun:
+                return last_dagrun.get_task_instance(self.task_id, session=session)
+
+        return None
 
     @provide_session
     def are_dependencies_met(
@@ -2540,6 +2556,8 @@ class DAG(BaseDag, LoggingMixin):
     :type sla_miss_callback: types.FunctionType
     :param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT)
     :type orientation: string
+    :param catchup: Perform scheduler catchup (or only run latest)? Defaults to True
+    "type catchup: bool"
     """
 
     def __init__(
@@ -2557,6 +2575,7 @@ class DAG(BaseDag, LoggingMixin):
             dagrun_timeout=None,
             sla_miss_callback=None,
             orientation=configuration.get('webserver', 'dag_orientation'),
+            catchup=configuration.getboolean('scheduler', 'catchup_by_default'),
             params=None):
 
         self.user_defined_macros = user_defined_macros
@@ -2597,6 +2616,7 @@ class DAG(BaseDag, LoggingMixin):
         self.dagrun_timeout = dagrun_timeout
         self.sla_miss_callback = sla_miss_callback
         self.orientation = orientation
+        self.catchup = catchup
 
         self._comps = {
             'dag_id',
@@ -3848,6 +3868,29 @@ class DagRun(Base):
         return self.dag
 
     @provide_session
+    def get_previous_dagrun(self, session=None):
+        """The previous DagRun, if there is one"""
+
+        return session.query(DagRun).filter(
+            DagRun.dag_id == self.dag_id,
+            DagRun.execution_date < self.execution_date
+        ).order_by(
+            DagRun.execution_date.desc()
+        ).first()
+
+    @provide_session
+    def get_previous_scheduled_dagrun(self, session=None):
+        """The previous, SCHEDULED DagRun, if there is one"""
+
+        if not self.dag:
+            return None
+
+        return session.query(DagRun).filter(
+            DagRun.dag_id == self.dag_id,
+            DagRun.execution_date == self.dag.previous_schedule(self.execution_date)
+        ).first()
+
+    @provide_session
     def update_state(self, session=None):
         """
         Determines the overall state of the DagRun based on the state

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1caaceb3/airflow/ti_deps/deps/prev_dagrun_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py b/airflow/ti_deps/deps/prev_dagrun_dep.py
index 82355ec..2fce704 100644
--- a/airflow/ti_deps/deps/prev_dagrun_dep.py
+++ b/airflow/ti_deps/deps/prev_dagrun_dep.py
@@ -39,10 +39,22 @@ class PrevDagrunDep(BaseTIDep):
             raise StopIteration
 
         # Don't depend on the previous task instance if we are the first task
-        if ti.execution_date == ti.task.start_date:
-            yield self._passing_status(
-                reason="This task instance was the first task instance for it's task.")
-            raise StopIteration
+        dag = ti.task.dag
+        if dag.catchup:
+            if ti.execution_date == ti.task.start_date:
+                yield self._passing_status(
+                    reason="This task instance was the first task instance for its task.")
+                raise StopIteration
+
+        else:
+
+            dr = ti.get_dagrun()
+            last_dagrun = dr.get_previous_dagrun() if dr else None
+
+            if not last_dagrun:
+                yield self._passing_status(
+                    reason="This task instance was the first task instance for its task.")
+                raise StopIteration
 
         previous_ti = ti.previous_ti
         if not previous_ti:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1caaceb3/docs/scheduler.rst
----------------------------------------------------------------------
diff --git a/docs/scheduler.rst b/docs/scheduler.rst
index 9c8a618..749d58a 100644
--- a/docs/scheduler.rst
+++ b/docs/scheduler.rst
@@ -17,6 +17,9 @@ the run stamped ``2016-01-01`` will be trigger soon after ``2016-01-01T23:59``.
 In other words, the job instance is started once the period it covers
 has ended.
 
+**Let's Repeat That** The scheduler runs your job one ``schedule_interval`` AFTER the
+start date, at the END of the period.
+
 The scheduler starts an instance of the executor specified in the your
 ``airflow.cfg``. If it happens to be the ``LocalExecutor``, tasks will be
 executed as subprocesses; in the case of ``CeleryExecutor`` and
@@ -72,6 +75,55 @@ should be triggered and come to a crawl. It might also create undesired
 processing when changing the shape of your DAG, by say adding in new
 tasks.
 
+Backfill and Catchup
+''''''''''''''''''''
+
+An Airflow DAG with a ``start_date``, possibly an ``end_date``, and a ``schedule_interval``
defines a
+series of intervals which the scheduler turn into individual Dag Runs and execute. A key
capability of
+Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default,
will examine
+the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG
Run for any
+interval that has not been run (or has been cleared). This concept is called Catchup.
+
+If your DAG is written to handle it's own catchup (IE not limited to the interval, but instead
to "Now"
+for instance.), then you will want to turn catchup off (Either on the DAG itself with ``dag.catchup
=
+False``) or by default at the configuration file level with ``catchup_by_default = False``.
What this
+will do, is to instruct the scheduler to only create a DAG Run for the most current instance
of the DAG
+interval series.
+
+.. code:: python
+    """
+    Code that goes along with the Airflow tutorial located at:
+    https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
+    """
+    from airflow import DAG
+    from airflow.operators.bash_operator import BashOperator
+    from datetime import datetime, timedelta
+
+
+    default_args = {
+        'owner': 'airflow',
+        'depends_on_past': False,
+        'start_date': datetime(2015, 12, 1),
+        'email': ['airflow@airflow.com'],
+        'email_on_failure': False,
+        'email_on_retry': False,
+        'retries': 1,
+        'retry_delay': timedelta(minutes=5),
+        'schedule_interval': '@hourly',
+    }
+
+    dag = DAG('tutorial', catchup=False, default_args=default_args)
+
+In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6
AM, (or from the
+command line), a single DAG Run will be created, with an ``execution_date`` of 2016-01-01,
and the next
+one will be created just after midnight on the morning of 2016-01-03 with an execution date
of 2016-01-02.
+
+If the ``dag.catchup`` value had been True instead, the scheduler would have created a DAG
Run for each
+completed interval between 2015-12-01 and 2016-01-02 (but not yet one for 2016-01-02, as
that interval
+hasn't completed) and the scheduler will execute them sequentially. This behavior is great
for atomic
+datasets that can easily be split into periods. Turning catchup off is great if your DAG
Runs perform
+backfill internally.
+
 External Triggers
 '''''''''''''''''
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1caaceb3/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 32c615d..dab7a47 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1101,3 +1101,105 @@ class SchedulerJobTest(unittest.TestCase):
             running_date = 'Except'
 
         self.assertEqual(execution_date, running_date, 'Running Date must match Execution
Date')
+
+    def test_dag_catchup_option(self):
+        """
+        Test to check that a DAG with catchup = False only schedules beginning now, not back
to the start date
+        """
+
+        now = datetime.datetime.now()
+        six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0,
second=0, microsecond=0)
+        three_minutes_ago = now - datetime.timedelta(minutes=3)
+        two_hours_and_three_minutes_ago = three_minutes_ago - datetime.timedelta(hours=2)
+
+        START_DATE = six_hours_ago_to_the_hour
+        DAG_NAME1 = 'no_catchup_test1'
+        DAG_NAME2 = 'no_catchup_test2'
+        DAG_NAME3 = 'no_catchup_test3'
+
+        default_args = {
+            'owner': 'airflow',
+            'depends_on_past': False,
+            'start_date': START_DATE
+
+        }
+        dag1 = DAG(DAG_NAME1,
+                  schedule_interval='* * * * *',
+                  max_active_runs=1,
+                  default_args=default_args
+                  )
+
+        default_catchup = configuration.getboolean('scheduler', 'catchup_by_default')
+        # Test configs have catchup by default ON
+
+        self.assertEqual(default_catchup, True)
+
+        # Correct default?
+        self.assertEqual(dag1.catchup, True)
+
+        dag2 = DAG(DAG_NAME2,
+                  schedule_interval='* * * * *',
+                  max_active_runs=1,
+                  catchup=False,
+                  default_args=default_args
+                  )
+
+        run_this_1 = DummyOperator(task_id='run_this_1', dag=dag2)
+        run_this_2 = DummyOperator(task_id='run_this_2', dag=dag2)
+        run_this_2.set_upstream(run_this_1)
+        run_this_3 = DummyOperator(task_id='run_this_3', dag=dag2)
+        run_this_3.set_upstream(run_this_2)
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag2.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+        session.close()
+
+        scheduler = SchedulerJob()
+        dag2.clear()
+
+        dr = scheduler.create_dag_run(dag2)
+
+        # We had better get a dag run
+        self.assertIsNotNone(dr)
+
+        # The DR should be scheduled in the last 3 minutes, not 6 hours ago
+        self.assertGreater(dr.execution_date, three_minutes_ago)
+
+        # The DR should be scheduled BEFORE now
+        self.assertLess(dr.execution_date, datetime.datetime.now())
+
+        dag3 = DAG(DAG_NAME3,
+                  schedule_interval='@hourly',
+                  max_active_runs=1,
+                  catchup=False,
+                  default_args=default_args
+              )
+
+        run_this_1 = DummyOperator(task_id='run_this_1', dag=dag3)
+        run_this_2 = DummyOperator(task_id='run_this_2', dag=dag3)
+        run_this_2.set_upstream(run_this_1)
+        run_this_3 = DummyOperator(task_id='run_this_3', dag=dag3)
+        run_this_3.set_upstream(run_this_2)
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag3.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+        session.close()
+
+        scheduler = SchedulerJob()
+        dag3.clear()
+
+        dr = None
+        dr = scheduler.create_dag_run(dag3)
+
+        # We had better get a dag run
+        self.assertIsNotNone(dr)
+
+        # The DR should be scheduled in the last two hours, not 6 hours ago
+        self.assertGreater(dr.execution_date, two_hours_and_three_minutes_ago)
+
+        # The DR should be scheduled BEFORE now
+        self.assertLess(dr.execution_date, datetime.datetime.now())


Mime
View raw message