airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "colins@thinknear.com"<col...@thinknear.com>
Subject SubDagOperator custom on_retry_callback handling For 1.9.0
Date Fri, 17 Nov 2017 01:58:53 GMT
Hello Airflow Devs, 

This is a continuation of this mailing thread: 

https://lists.apache.org/thread.html/25f07715b834a0e8b70b9e39fad8b82771fb267c33da484f15e61c3e@%3Cdev.airflow.apache.org%3E

So we were able to figure out why our Subdag Operator task instances would go from FAILED
-> UP_FOR_RETRY -> FAILED. 

As you can read from the above posted mailing list, we have custom on_retry_callback function
that will set all the task instances (within the Subdag) to be UP_FOR_RETRY. And some other
process (unknown to us) would set the state of these task instances back to FAILED. And thus,
when the next iteration of the Subdag would run, it could not execute the Subdag tasks since
they were set to FAILED. 

Here is what we found:

SubdagOperators have their own DagRun entry, as a result, whenever a Subdag task fails, the
DagRun will be set to failed whenever the root task instance (inside the Subdag) is UPSTREAM_FAILED
or FAILED.  Since all of our Subdag tasks have 0 retries configured, no runs will continue.
 Our Subdag has retries set to 2, so it will call the on_retry_callback and set all the Subdag
task instances to be UP_FOR_RETRY.  In the `jobs.py`, there is a function called `_change_state_for_tis_without_dagrun()`,
under the SchedulerJob class, that will change the state of the task instances of a corresponding
DagRun when the DagRun's state is not set to RUNNING.  The `_change_state_for_tis_without_dagrun()`
will then mark the subdag tasks as FAILED afterwards, since the Subdag's DagRun was set to
FAILED.

So what we did to combat this was we would also change the state of the Subdag DagRun (in
the on_retry_callback handler) to RUNNING to avoid this from happening. Now our retries are
running fine. 

We have a couple questions here:

1) This may be an edge case, but this seems somewhat roundabout in terms of how we might manage
this case.  The comment in the code states the following:

                # Handle cases where a DAG run state is set (perhaps manually) to
                # a non-running state. Handle task instances that belong to
                # DAG runs in those states

                # If a task instance is up for retry but the corresponding DAG run
                # isn't running, mark the task instance as FAILED so we don't try
                # to re-run it.
                self._change_state_for_tis_without_dagrun(simple_dag_bag,
                                          [State.UP_FOR_RETRY],
                                          State.FAILED)

There appears to be an issue around a SubDag having a DagRun and also being a Task that we
encountered b/c of the on_retry_callback.  Does it make more sense for a SubDag DagRun to
not be subject to this condition (the condition identified in the comment)?

2) We are not able to figure out why our old on_retry_callback was working fine in our testing
environment (which doesn't actively run DAGs unless we're testing) but not our production
environment, which constantly runs DAGs.  We replicated our sandbox environment to behave
exactly like production and scheduled DAGs to run, but haven't seen the above behavior.  Any
thoughts why this might behave differently in some cases?  

Thanks,

Colin

Mime
View raw message