Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 38B00200CD9 for ; Wed, 19 Jul 2017 15:05:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 37396168F08; Wed, 19 Jul 2017 13:05:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 60430168F0A for ; Wed, 19 Jul 2017 15:05:04 +0200 (CEST) Received: (qmail 72464 invoked by uid 500); 19 Jul 2017 13:05:03 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 72312 invoked by uid 99); 19 Jul 2017 13:05:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jul 2017 13:05:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 0885BC01E3 for ; Wed, 19 Jul 2017 13:05:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id bUxRQK0yj0IJ for ; Wed, 19 Jul 2017 13:05:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 592525FDCB for ; Wed, 19 Jul 2017 13:05:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id D5BF4E0DCA for ; Wed, 19 Jul 2017 13:05:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 1F7DB21EA6 for ; Wed, 19 Jul 2017 13:05:00 +0000 (UTC) Date: Wed, 19 Jul 2017 13:05:00 +0000 (UTC) From: "Conrad Lee (JIRA)" To: commits@airflow.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 19 Jul 2017 13:05:05 -0000 [ https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Conrad Lee updated AIRFLOW-1428: -------------------------------- Description: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules. It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} and its parent is skipped, the scheduler can't find any tasks that are ready for execution. The Dag is then marked as failed (due to deadlock). I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simple DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. was: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules. It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} and its parent is skipped, the scheduler can't find any tasks that are ready for execution. The Dag is then marked as failed (due to deadlock). I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. > DagRun deadlocks when all tasks' dependencies have skipped state > ---------------------------------------------------------------- > > Key: AIRFLOW-1428 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1428 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun, dependencies > Affects Versions: 1.8.2 > Environment: LocalExecutor with postgres > Reporter: Conrad Lee > Assignee: Bolke de Bruin > Priority: Critical > > In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules. It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. > Consider the following simple example > {code} > def raise_skip(): > raise AirflowSkipException > skip_op = PythonOperator( > task_id='skip_op', > python_callable=raise_skip, > dag=dag) > skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) > skip_op.set_downstream(skipped_child_op) > {code} > When I run the code above, the DagRun deadlocks. I have dug into why: > * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. > * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} > * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] > So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} and its parent is skipped, the scheduler can't find any tasks that are ready for execution. The Dag is then marked as failed (due to deadlock). I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simple DAG. > It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)