From commits-return-28120-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Thu Nov 15 14:23:04 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id F0BB3180669 for ; Thu, 15 Nov 2018 14:23:03 +0100 (CET) Received: (qmail 23538 invoked by uid 500); 15 Nov 2018 13:23:03 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 23529 invoked by uid 99); 15 Nov 2018 13:23:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Nov 2018 13:23:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 9A93FC0586 for ; Thu, 15 Nov 2018 13:23:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 5lR2s2JrK-6c for ; Thu, 15 Nov 2018 13:23:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 38E9462359 for ; Thu, 15 Nov 2018 13:23:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 8A3A7E0177 for ; Thu, 15 Nov 2018 13:23:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 4895621358 for ; Thu, 15 Nov 2018 13:23:00 +0000 (UTC) Date: Thu, 15 Nov 2018 13:23:00 +0000 (UTC) From: "Greg H (JIRA)" To: commits@airflow.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Closed] (AIRFLOW-3182) 'all_done' trigger rule works incorrectly with BranchPythonOperator upstream tasks MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AIRFLOW-3182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg H closed AIRFLOW-3182. --------------------------- Resolution: Feedback Received > 'all_done' trigger rule works incorrectly with BranchPythonOperator upstream tasks > ---------------------------------------------------------------------------------- > > Key: AIRFLOW-3182 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3182 > Project: Apache Airflow > Issue Type: Bug > Affects Versions: 1.9.0, 1.10.0 > Reporter: Greg H > Priority: Major > Attachments: BrannchPythonOperator.png, Screen Shot 2018-11-15 at 13.51.07.png, refactored.png > > > We have a job that runs some data processing every hour. At the end of the day we need to run aggregation on all data generated by the 'hourly' jobs, regardless if any 'hourly' job failed or not. For this purpose we have prepared DAG that uses BranchPythonOperator in order to decide which 'hourly' job needs to be run in given time and when task for hour 23 is done, we trigger the aggregation (downstream). For this to work regardless of the last 'hourly' task status the *'all_done'* trigger rule is set in the aggregation task. Unfortunately, such configuration works incorrectly causing aggregation task to be run after every 'hourly' task, despite the fact the aggregation task is set as downstream for 'task_for_hour-23' +only+: > !BrannchPythonOperator.png! > Here's sample code: > {code:java} > # coding: utf-8 > from airflow import DAG > from airflow.operators.python_operator import PythonOperator > from airflow.operators.python_operator import BranchPythonOperator > from airflow.operators.dummy_operator import DummyOperator > from airflow.models import TriggerRule > from datetime import datetime > import logging > dag_id = 'test' > today = datetime.today().strftime("%Y-%m-%d"); > task_prefix = 'task_for_hour-' > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2018, 6, 18), > 'catchup': False, > } > dag = DAG( > dag_id=dag_id, > default_args=default_args, > schedule_interval="@hourly", > catchup=False > ) > # Setting the current hour > def get_current_hour(): > return datetime.now().hour > # Returns the name id of the task to launch next (task_for_hour-0, task_for_hour-1, etc.) > def branch(): > return task_prefix + str(get_current_hour()) > # Running hourly job > def run_hourly_job(**kwargs): > current_hour = get_current_hour() > logging.info("Running job for hour: %s" % current_hour) > # Main daily aggregation > def run_daily_aggregation(**kwargs): > logging.info("Running daily aggregation for %s" % today) > > start_task = DummyOperator( > task_id='start', > dag=dag > ) > # 'branch' method returns name of the task to be run next. > hour_branching = BranchPythonOperator( > task_id='hour_branching', > python_callable=branch, > dag=dag) > run_aggregation = PythonOperator( > task_id='daily_aggregation', > python_callable=run_daily_aggregation, > provide_context=True, > trigger_rule=TriggerRule.ALL_DONE, > dag=dag > ) > start_task.set_downstream(hour_branching) > # Create tasks for each hour > for hour in range(24): > if hour == 23: > task_for_hour_23 = PythonOperator( > task_id=task_prefix + '23', > python_callable=run_hourly_job, > provide_context=True, > dag=dag > ) > hour_branching.set_downstream(task_for_hour_23) > task_for_hour_23.set_downstream(run_aggregation) > else: > hour_branching.set_downstream(PythonOperator( > task_id=task_prefix + str(hour), > python_callable=run_hourly_job, > provide_context=True, > dag=dag) > ) > {code} > This me be also related to AIRFLOW-1419 -- This message was sent by Atlassian JIRA (v7.6.3#76005)