airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Riccomini (JIRA)" <>
Subject [jira] [Commented] (AIRFLOW-14) DagRun Refactor (Scheduler 2.0)
Date Thu, 28 Apr 2016 20:39:12 GMT


Chris Riccomini commented on AIRFLOW-14:

If there are two DRJs running simultaneously, this race condition can occur, as far as I know:

# DRJ0 and DRJ1 both see DAG0 needs to be rnu
# DRJ0 checks if the lock is set. It's not.
# DRJ1 checks if the lock is set. It's not.
# DRJ0 sets lock to be owned by DRJ0.
# DRJ0 checks to see who the owner is. It's DRJ0. DRJ0 starts running the DAG.
# DRJ1 sets the lock to be owned by DRJ1.
# DRJ1 checks to see who the owner is. It's DRJ1. DRJ1 starts running the DAG.

The SQL that I wrote above prevents this from happening because the second update (6) doesn't
happen because at that point, the lock_id isn't null, it's DRJ0.

> DagRun Refactor (Scheduler 2.0)
> -------------------------------
>                 Key: AIRFLOW-14
>                 URL:
>             Project: Apache Airflow
>          Issue Type: Improvement
>            Reporter: Jeremiah Lowin
>            Assignee: Jeremiah Lowin
>              Labels: backfill, dagrun, scheduler
> For full proposal, please see the Wiki:
> Borrowing from that page: 
> *Description of New Workflow*
> DagRuns represent the state of a DAG at a certain point in time (perhaps they should
be called DagInstances?). To run a DAG – or to manage the execution of a DAG – a DagRun
must first be created. This can be done manually (simply by creating a DagRun object) or automatically,
using methods like dag.schedule_dag(). Therefore, both scheduling new runs OR introducing
ad-hoc runs can be done by any process at any time, simply by creating the appropriate object.
> Just creating a DagRun is not enough to actually run the DAG (just as creating a TaskInstance
is not the same as actually running a task). We need a Job for that. The DagRunJob is fairly
simple in structure. It maintains a set of DagRuns that it is tasked with executing, and loops
over that set until all the DagRuns either succeed or fail. New DagRuns can be passed to the
job explicitly via DagRunJob.submit_dagruns() or by defining its DagRunJob.collect_dagruns()
method, which is called during each loop. When the DagRunJob is executing a specific DagRun,
it locks it. Other DagRunJobs will not try to execute locked DagRuns. This way, many DagRunJobs
can run simultaneously in either a local or distributed setting, and can even be pointed at
the same DagRuns, without worrying about collisions or interference.
> The basic DagRunJob loop works like this:
> - refresh dags
> - collect new dagruns
> - process dagruns (including updating dagrun states for success/failure)
> - call executor/own heartbeat
> By tweaking the DagRunJob, we can easily recreate the behavior of the current SchedulerJob
and BackfillJob. The Scheduler simply runs forever and picks up ALL active DagRuns in collect_dagruns();
Backfill generates DagRuns corresponding to the requested start/end dates and submits them
to itself prior to initiating its loop.

This message was sent by Atlassian JIRA

View raw message