airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Qian Yu (Jira)" <>
Subject [jira] [Created] (AIRFLOW-5648) Add ClearTaskOperator to allow clearing/re-running tasks from within a DAG
Date Sun, 13 Oct 2019 13:19:00 GMT
Qian Yu created AIRFLOW-5648:

             Summary: Add ClearTaskOperator to allow clearing/re-running tasks from within
                 Key: AIRFLOW-5648
             Project: Apache Airflow
          Issue Type: New Feature
          Components: operators
    Affects Versions: 1.10.5
            Reporter: Qian Yu

There are use cases where some external conditions have changed and a section of the DAG needs
to be re-run (after they have already finished previously). Here's such an example I recently

We have a DAG that runs the following in the morning for execution_date T. The preliminary
result of task J is needed at 9am in the morning:
A >> C >> E >> G >> H >> I >> J >> Finish
B >> D >> F>>>>>
And later on in the afternoon at 3pm, some external condition changes (indicated by a Sensor).
At that point, we need to re-run task A and all its downstream tasks to reflect the possible
changes. The new results of J is needed in the evening.

One possible approach is to make the DAG look like this by duplicating the section that needs
to be re-run. In the following image, A1 runs the same command as A, C1 runs the same command
as C, etc. This mostly works, but it causes the DAG to look unnecessarily large. It also causes
code duplication because the tasks A1, C1, E1, G1, H1, I1 are all identical to the original
tasks. They are duplicated because we need to re-run the same task in the afternoon. In this
simplified example, the duplication does not look too bad, but in the real examples I faced,
task A has many downstream tasks with complex dependencies. Copying all of them is more difficult.
Also, because of these duplication, the next time someone updates the DAG and inserts a new
task in the middle of E and G, it'll be hard to remember to add it in between E1 and G1 as
A >> C >> E >> G >> H >> I >>  J >>>>>>>>>
               ^                             ^
               |                             |
B >> D >> F>>>>                              |
Sensor >> A1 >> C1 >> E1 >> G1 >> H1 >>>>>>>
Instead of duplicating the tasks, I'm proposing adding a ClearTaskOperator. This operator
takes an external_task_id as its parameter. When ClearTaskOperator runs, it clears the state
of the given external_task_id and all its downstream tasks. This will cause them to re-run.
So the problem I'm facing can be tackled without duplicating all those tasks. With ClearTaskOperator,
the DAG can look like this. 
A >> C >> E >> G >> H >> I >>  J >>>>>>>>>>
               ^                              ^
               |                              |
B >> D >> F>>>>                               |
Sensor >> Clear_Task_A >>>>>>>>>>>>>>>>>>>>>>>
In the above DAG, Clear_Task_A is a ClearTaskOperator defined like this. When Clear_Task_A
executes, it clears task A and all its downstream tasks.
Clear_Task_A = ClearTaskOperator(task_id="Clear_Task_A", external_task_id="A") {code}

This message was sent by Atlassian Jira

View raw message