airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Adarsh MN (Jira)" <>
Subject [jira] [Assigned] (AIRFLOW-4525) Trigger Dag Operator causes duplicate key exceptions and can cause runaway dag spawning as it is not atomic at the DB level (on Postgres at least.)
Date Mon, 16 Sep 2019 10:31:00 GMT


Adarsh MN reassigned AIRFLOW-4525:

    Assignee: Adarsh MN

> Trigger Dag Operator causes duplicate key exceptions and can cause runaway dag spawning
as it is not atomic at the DB level (on Postgres at least.)
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>                 Key: AIRFLOW-4525
>                 URL:
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG, DagRun
>    Affects Versions: 1.10.3
>            Reporter: Tony Brookes
>            Assignee: Adarsh MN
>            Priority: Blocker
> When using the TriggerDagRunOperator there is a problem in the code which loops round
subdags scheduling them.  You will not see this issue if you only have one level of sub dag,
but if your sub dags have sub dags then you will see it.
> The code pops an item off the list (non unique) and schedules it.  It then appends all
sub dags of the dag it popped off the list to the current list.  It keeps doing this until
the list is empty.
> The problem is that <<top level dag>>.subdags returns _*all*_ subdags at
_*all*_ levels.  So when you process a <<top level dag.first level subdag>> it
calls <<first level subdag>>.subdags and once agains this will append all its
subdags, _*which are already in the list*_.  Thus you are now certain you will get a duplicate
key exception as the same dag ID and run ID are present twice.
> Up to and including 1.10.2 this is not a significant problem most of the time.  You
see the duplicate key errors in the logs but it does not cause the operator to raise and hence
the task actually succeeds.  That said, you do get a load of "running" sub dags in the console
which never really do anything as they aren't invoked from the parent dag when it wants them
to run and hence have no "task instance" connection to that dag.
> *+However, in 1.10.3 this causes havoc.+*
> Firstly, it no longer exits cleanly.  It causes the operator to raise an error and so
it fails.  Worse, since the statements it has executed to schedule is dag are _*not*_ in
the same transaction, all the dags before the first duplicate _*are triggered*_.  But
since the task will subsequently be retried (if configured) _*they will be triggered again.*_
 Because the logic to generate the run ID use now() as part of the key they generate, subsequent
invocations will have a different run ID and hence will cause all the dags before the first
duplicate exception to be scheduled repeatedly, up to the maximum retry limit.  You still
get all the orphaned sub dag entries I mentioned from 10.2, but you get many many copies of
> I'm not sure what the best fix is (or if it's my place to suggest one) but from what
I've seen the cleanest approach is either to use a set, to avoid duplicate entries, rather
than the current list based approach OR continue to use the list with it's "pop" semantics
but keep track of items already processed and avoid re-appending them.
> This would fix the current problem, but to be honest it feels semantically *_incorrect_* to
trigger the sub dags in this way.  The top level dag invokes the sub dags as task instances
like any other and you're going behind its back invoking them this way.  Moreover, the semantic
contract of the TriggerDagRunOperator is that it takes a single dag ID as input, implicitly
creating the expectation that this is the _*only dag which will be triggered.*_  Scheduling
the sub dags as part of doing this feels wrong and actually creates an error whilst doing
nothing to help the operation of the platform (unless there is a different configuration set
up I am not thinking of which is entirely possible.)
> But as far as I can discern, if you _*only*_ trigger the top level dag you've been _*asked*_
to trigger then actually, everything will work just fine.  The SubDagOperator which wraps
the sub dags will trigger the sub dag anyway at the right time, based on whatever dependencies
are in the top level dag (which might be none, in which case any sub dags will get scheduled
automatically.  The reason I know this of course is that the first time you trigger the top
level DAG in the UI, only one row is written to the dag_run table, only the top level dag
is triggered, and yet, it works just fine...
> If there is some scenario which should still require the sub dags to be triggered, I
think it's important this this sort of operator is atomic (or at the very least idempotent.)
 Otherwise you can risk significant issues in a production environment with "over-triggering"
Dags.  Even if concurrent dag configuration prevents them from running concurrently the list
of scheduled dags can in theory grow forever (or to max_retries of the TriggerDagRunOperator
task) and can cause some significant undesirable side effects.  From what I can see, using
a transaction would perhaps be complex (and not cross platform friendly), but at the very
least the dag entries should perhaps be added to the DB with _*no*_ state and then convert
them all to RUNNING once you know they've all successfully inserted and any primary key issues
are resolved.  The state is not part of the primary key so this would not cause a problem.
 The worst case outcome under this approach occurs only if some form of DB failure between
the inserts and the state update occurs.  This potentially means the dags never started,
but I think that's a "better worst case" than the current situation where multiple unintended
triggers can happen.
> I have set this as a Blocker because I cannot find any way around it without modifying
the core code myself and we, like many others I suspect, have dags which start with a sensor
waiting for incoming data and then process it and trigger another instance of themselves to
wait once again.
> We are currently using 1.10.2 but this is a blocker for us upgrading to 1.10.3.  I can't
find any way to stop the duplicate key errors from happening whatever I do unless I completely
re-work my entire dag layout, which just makes them look highly complex and would obliterate
the nice modular approach we've managed to build in our platform (and indeed which the Airflow
platform encourages.)
> Please let me know if you need anything else.  I've never contributed to an Apache project
and would need a little guidance and support if I were to try to address it myself.  I'm
willing to try though as I use Airflow a lot and would love to give something back.  Would
just need a little early pointing in the right direction. :)

This message was sent by Atlassian Jira

View raw message