From commits-return-70335-archive-asf-public=cust-asf.ponee.io@airflow.apache.org Tue Oct 15 02:51:02 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 64AED180648 for ; Tue, 15 Oct 2019 04:51:02 +0200 (CEST) Received: (qmail 58319 invoked by uid 500); 15 Oct 2019 02:51:01 -0000 Mailing-List: contact commits-help@airflow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.apache.org Delivered-To: mailing list commits@airflow.apache.org Received: (qmail 58293 invoked by uid 99); 15 Oct 2019 02:51:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Oct 2019 02:51:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id BD3671A3434 for ; Tue, 15 Oct 2019 02:51:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -110.2 X-Spam-Level: X-Spam-Status: No, score=-110.2 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, SCC_10_SHORT_WORD_LINES=1, SCC_5_SHORT_WORD_LINES=1, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id b5kLjQOn_P16 for ; Tue, 15 Oct 2019 02:50:58 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=207.244.88.153; helo=mail.apache.org; envelope-from=jira@apache.org; receiver= Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with SMTP id 155307F5D0 for ; Tue, 15 Oct 2019 02:44:01 +0000 (UTC) Received: (qmail 52487 invoked by uid 99); 15 Oct 2019 02:44:01 -0000 Received: from mailrelay1-us-west.apache.org (HELO mailrelay1-us-west.apache.org) (209.188.14.139) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Oct 2019 02:44:01 +0000 Received: from jira-he-de.apache.org (static.172.67.40.188.clients.your-server.de [188.40.67.172]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A375CE069E for ; Tue, 15 Oct 2019 02:44:00 +0000 (UTC) Received: from jira-he-de.apache.org (localhost.localdomain [127.0.0.1]) by jira-he-de.apache.org (ASF Mail Server at jira-he-de.apache.org) with ESMTP id 2023D7802F1 for ; Tue, 15 Oct 2019 02:44:00 +0000 (UTC) Date: Tue, 15 Oct 2019 02:44:00 +0000 (UTC) From: "Qian Yu (Jira)" To: commits@airflow.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (AIRFLOW-5648) Add ClearTaskOperator to allow clearing/re-running tasks from within a DAG MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AIRFLOW-5648?page=3Dcom.atlass= ian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qian Yu updated AIRFLOW-5648: ----------------------------- Description:=20 There are use cases where some external conditions have changed and a secti= on of the DAG needs to be re-run (after they have already finished previous= ly). Here's such an example I recently encountered: We have a DAG that runs the following in the morning for execution_date T. = The preliminary result of task J is needed at 9am in the morning. K, L and = M needs to wait for Sensor to pass so they are not done till much later in = the evening: {code:java} A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish ^ ^ | | B >> D >> F>>>>> Sensor {code} Later on in the afternoon at 3pm, some external condition changes (indicate= d by Sensor). At that point, we need to re-run task A and all its downstrea= m tasks that have already run (i.e. A, C, E, G, H, I, J) to reflect possibl= e changes. Other finished tasks such as B, D, F do not need to be re-run. T= he new results of J is needed in the evening by downstream tasks K, L, M th= at have been waiting. One possible approach is to make the DAG look like this by duplicating the = section that needs to be re-run. In the following image, A1 runs the same c= ommand as A, C1 runs the same command as C, etc. This mostly works, but it = causes the DAG to look unnecessarily large. It also causes code duplication= because the tasks A1, C1, E1, G1, H1, I1, J1 are all identical to the orig= inal tasks. In this simplified example, the duplication does not look too b= ad, but in the real examples I faced, task A has many downstream tasks with= complex dependencies. Copying all of them is more difficult. Because of th= ese duplication, the next time someone updates the DAG and inserts a new ta= sk in the middle of E and G, it'll be hard to remember to add it in between= E1 and G1 as well. {code:java} A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish ^ ^ =20 | |__________ =20 B >> D >> F>>>> | | Sensor >> A1 >> C1 >> E1 >> G1 >> H1 >> I1 >> J1 {code} Instead of duplicating the tasks, I'm proposing adding a ClearTaskOperator.= This operator takes an external_task_id as its parameter. When ClearTaskOp= erator runs, it clears the state of the given external_task_id and all its = downstream tasks. This will cause them to re-run. So the problem I'm facing= can be tackled without duplicating all those tasks. With ClearTaskOperator= , the DAG can look like this.=C2=A0 {code:java} A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish ^ ^ =20 | | =20 B >> D >> F>>>> | | Sensor >> Clear_Task_A >>>>>>>>>>>>> {code} In the above DAG, Clear_Task_A is a ClearTaskOperator defined like this. Wh= en Clear_Task_A executes, it clears task A and all its downstream tasks (so= in this case it causes A, C, E, G, H, I, J to be cleared and re-run). {code:python} Clear_Task_A =3D ClearTaskOperator(task_id=3D"Clear_Task_A", external_task_= id=3D"A") {code} was: There are use cases where some external conditions have changed and a secti= on of the DAG needs to be re-run (after they have already finished previous= ly). Here's such an example I recently encountered: We have a DAG that runs the following in the morning for execution_date T. = The preliminary result of task J is needed at 9am in the morning. K, L and = M needs to wait for Sensor to pass so they are not done till much later in = the evening: {code:java} A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish ^ ^ | | B >> D >> F>>>>> Sensor {code} Later on in the afternoon at 3pm, some external condition changes (indicate= d by Sensor). At that point, we need to re-run task A and all its downstrea= m tasks that have already run (i.e. A, C, E, G, H, I, J) to reflect possibl= e changes. Other finished tasks such as B, D, F do not need to be re-run. T= he new results of J is needed in the evening by downstream tasks K, L, M th= at have been waiting. One possible approach is to make the DAG look like this by duplicating the = section that needs to be re-run. In the following image, A1 runs the same c= ommand as A, C1 runs the same command as C, etc. This mostly works, but it = causes the DAG to look unnecessarily large. It also causes code duplication= because the tasks A1, C1, E1, G1, H1, I1, J1 are all identical to the orig= inal tasks. In this simplified example, the duplication does not look too b= ad, but in the real examples I faced, task A has many downstream tasks with= complex dependencies. Copying all of them is more difficult. Because of th= ese duplication, the next time someone updates the DAG and inserts a new ta= sk in the middle of E and G, it'll be hard to remember to add it in between= E1 and G1 as well. {code:java} A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish ^ ^ =20 | |__________ =20 B >> D >> F>>>> | | Sensor >> A1 >> C1 >> E1 >> G1 >> H1 >> I1 >> J1 {code} Instead of duplicating the tasks, I'm proposing adding a ClearTaskOperator.= This operator takes an external_task_id as its parameter. When ClearTaskOp= erator runs, it clears the state of the given external_task_id and all its = downstream tasks. This will cause them to re-run. So the problem I'm facing= can be tackled without duplicating all those tasks. With ClearTaskOperator= , the DAG can look like this.=C2=A0 {code:java} A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish ^ ^ =20 | |__________ =20 B >> D >> F>>>> | | Sensor >> Clear_Task_A >>>>>>>>>>>>>>>>>>>>>>> {code} In the above DAG, Clear_Task_A is a ClearTaskOperator defined like this. Wh= en Clear_Task_A executes, it clears task A and all its downstream tasks (so= in this case it causes A, C, E, G, H, I, J to be cleared and re-run). {code:python} Clear_Task_A =3D ClearTaskOperator(task_id=3D"Clear_Task_A", external_task_= id=3D"A") {code} > Add ClearTaskOperator to allow clearing/re-running tasks from within a DA= G > -------------------------------------------------------------------------= - > > Key: AIRFLOW-5648 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5648 > Project: Apache Airflow > Issue Type: New Feature > Components: operators > Affects Versions: 1.10.5 > Reporter: Qian Yu > Priority: Major > Labels: ClearTaskOperator, airflow, clear, duplicate, operat= or, re-run, rerun, task > > There are use cases where some external conditions have changed and a sec= tion of the DAG needs to be re-run (after they have already finished previo= usly). Here's such an example I recently encountered: > We have a DAG that runs the following in the morning for execution_date T= . The preliminary result of task J is needed at 9am in the morning. K, L an= d M needs to wait for Sensor to pass so they are not done till much later i= n the evening: > {code:java} > A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish > ^ ^ > | | > B >> D >> F>>>>> Sensor > {code} > Later on in the afternoon at 3pm, some external condition changes (indica= ted by Sensor). At that point, we need to re-run task A and all its downstr= eam tasks that have already run (i.e. A, C, E, G, H, I, J) to reflect possi= ble changes. Other finished tasks such as B, D, F do not need to be re-run.= The new results of J is needed in the evening by downstream tasks K, L, M = that have been waiting. > One possible approach is to make the DAG look like this by duplicating th= e section that needs to be re-run. In the following image, A1 runs the same= command as A, C1 runs the same command as C, etc. This mostly works, but i= t causes the DAG to look unnecessarily large. It also causes code duplicati= on because the tasks A1, C1, E1, G1, H1, I1, J1 are all identical to the or= iginal tasks. In this simplified example, the duplication does not look too= bad, but in the real examples I faced, task A has many downstream tasks wi= th complex dependencies. Copying all of them is more difficult. Because of = these duplication, the next time someone updates the DAG and inserts a new = task in the middle of E and G, it'll be hard to remember to add it in betwe= en E1 and G1 as well. > {code:java} > A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish > ^ ^ =20 > | |__________ =20 > B >> D >> F>>>> | > | > Sensor >> A1 >> C1 >> E1 >> G1 >> H1 >> I1 >> J1 > {code} > Instead of duplicating the tasks, I'm proposing adding a ClearTaskOperato= r. This operator takes an external_task_id as its parameter. When ClearTask= Operator runs, it clears the state of the given external_task_id and all it= s downstream tasks. This will cause them to re-run. So the problem I'm faci= ng can be tackled without duplicating all those tasks. With ClearTaskOperat= or, the DAG can look like this.=C2=A0 > {code:java} > A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish > ^ ^ =20 > | | =20 > B >> D >> F>>>> | > | > Sensor >> Clear_Task_A >>>>>>>>>>>>> > {code} > In the above DAG, Clear_Task_A is a ClearTaskOperator defined like this. = When Clear_Task_A executes, it clears task A and all its downstream tasks (= so in this case it causes A, C, E, G, H, I, J to be cleared and re-run). > {code:python} > Clear_Task_A =3D ClearTaskOperator(task_id=3D"Clear_Task_A", external_tas= k_id=3D"A") {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)