airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] Eronarn commented on a change in pull request #3584: [AIRFLOW-249] Refactor the SLA mechanism
Date Wed, 10 Oct 2018 16:47:46 GMT
Eronarn commented on a change in pull request #3584: [AIRFLOW-249] Refactor the SLA mechanism
URL: https://github.com/apache/incubator-airflow/pull/3584#discussion_r224157274
 
 

 ##########
 File path: airflow/models.py
 ##########
 @@ -4328,6 +4442,168 @@ def _test_cycle_helper(self, visit_map, task_id):
 
         visit_map[task_id] = DagBag.CYCLE_DONE
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))\
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO: Implement SLA misses for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = session.query(TI).outerjoin(DR, and_(
+                DR.dag_id == TI.dag_id,
+                DR.execution_date == TI.execution_date)).filter(
+                    # Only look at TIs for this DAG.
+                    TI.dag_id == self.dag_id,
+                    # Only look at TIs that *still* exist in this DAG.
+                    TI.task_id.in_(self.task_ids),
+                    # Don't look for success/skip TIs. We check SLAs often, so
+                    # there's little chance that a TI switches to successful
+                    # after an SLA miss but before we notice; and this should
+                    # be a major perf boost (since most TIs are successful or
+                    # skipped).
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED))),
+                    # Only look at specified DagRuns
+                    DR.id.in_(scheduled_dagrun_ids),
+                    # If the DAGRun is SUCCEEDED, then everything has gone
+                    # according to plan. But if it's FAILED, someone may be
+                    # coming to fix it, and SLAs for tasks in it will still
+                    # matter.
+                    DR.state != State.SUCCESS).all()
+        else:
+            scheduled_tis = []
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):
+                for ti in yield_unscheduled_tis(dag_run, ts):
+                    yield ti
+
+        # Snapshot the time to check SLAs against.
+        ts = timezone.utcnow()
+
+        for ti in itertools.chain(scheduled_tis, unscheduled_tis(last_dagrun)):
+            ti.task = self.task_dict[ti.task_id]
+            # Ignore tasks that don't have SLAs, saving most calculation of
+            # future task instances.
+            if ti.task.has_slas():
+                create_sla_misses(ti, ts, session=session)
+
+        # Save any SlaMisses that were created in `create_sla_misses()`
+        session.commit()
+
+    @provide_session
+    def get_sla_notifications(self, session=None):
+        """
+        Find all SlaMisses for this DAG that haven't yet been notified.
+        """
+        return (
+            session
+            .query(SlaMiss)
+            .filter(SlaMiss.notification_sent == False)  # noqa
+            .filter(SlaMiss.dag_id == self.dag_id)
+            .all()
+        )
+
+    @provide_session
+    def send_sla_notifications(self, sla_misses, session=None):
+        """
+        Given a list of SLA misses, send emails and/or do SLA miss callback.
+        """
+        if not sla_misses:
+            self.log.info("send_sla_notifications was called without any SLA "
+                          "notifications to send!")
+            return
+
+        # Retrieve the context for this TI, but patch in the SLA miss object.
+        for sla_miss in sla_misses:
+            TI = TaskInstance
+            ti = session.query(TI).filter(
+                TI.dag_id != sla_miss.dag_id,
 
 Review comment:
   Oops, that should also be `==` (so that we're querying for TIs matching the exact DAG ID,
task ID, and execution date of the SLA miss).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message