airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alek Storm <alek.st...@gmail.com>
Subject Re: Question about skipping, state propagation, and trigger rules.
Date Fri, 13 Oct 2017 20:25:03 GMT
Hi Daniel,

If you don’t find it too unwieldy, the following should satisfy your use
case. It basically converts what I assume was your use of
ShortCircuitOperator into a BranchPythonOperator with a dummy task. Try
running it with the environment variable FOO_SKIP=true to see the other
possible path. Let me know if you have any questions.

from datetime import datetimeimport os
from airflow.models import DAGfrom airflow.operators.bash_operator
import BashOperatorfrom airflow.operators.python_operator import
BranchPythonOperatorfrom airflow.operators.dummy_operator import
DummyOperator

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2017, 1, 1),
    'concurrency': 16,
}
with DAG('foo', default_args=default_args, schedule_interval='@once') as dag:
    sensor_dataA = BranchPythonOperator(
        task_id='sensor_dataA',
        python_callable=lambda: 'skipA' if os.environ.get('FOO_SKIP',
'false') == 'true' else 'preprocess_and_stage_dataA')
    sensor_dataB = BranchPythonOperator(
        task_id='sensor_dataB',
        python_callable=lambda: 'skipB' if os.environ.get('FOO_SKIP',
'false') == 'true' else 'preprocess_and_stage_dataB')
    sensor_dataC = BranchPythonOperator(
        task_id='sensor_dataC',
        python_callable=lambda: 'skipC' if os.environ.get('FOO_SKIP',
'false') == 'true' else 'preprocess_and_stage_dataC')

    preprocess_and_stage_dataA = BashOperator(
        task_id='preprocess_and_stage_dataA',
        bash_command='echo {{ti.task_id}}')
    preprocess_and_stage_dataB = BashOperator(
        task_id='preprocess_and_stage_dataB',
        bash_command='echo {{ti.task_id}}')
    preprocess_and_stage_dataC = BashOperator(
        task_id='preprocess_and_stage_dataC',
        bash_command='echo {{ti.task_id}}')

    skipA = DummyOperator(
        task_id='skipA')
    skipB = DummyOperator(
        task_id='skipB')
    skipC = DummyOperator(
        task_id='skipC')

    joinA = DummyOperator(
        task_id='joinA',
        trigger_rule='one_success')
    joinB = DummyOperator(
        task_id='joinB',
        trigger_rule='one_success')
    joinC = DummyOperator(
        task_id='joinC',
        trigger_rule='one_success')

    process_stagesABC = BashOperator(
        task_id='process_stagesABC',
        bash_command='echo {{ti.task_id}}')

    cleanup = BashOperator(
        task_id='cleanup',
        bash_command='echo {{ti.task_id}}')

    sensor_dataA >> preprocess_and_stage_dataA
    sensor_dataA >> skipA
    sensor_dataB >> preprocess_and_stage_dataB
    sensor_dataB >> skipB
    sensor_dataC >> preprocess_and_stage_dataC
    sensor_dataC >> skipC

    preprocess_and_stage_dataA >> joinA
    skipA >> joinA
    preprocess_and_stage_dataB >> joinB
    skipB >> joinB
    preprocess_and_stage_dataC >> joinC
    skipC >> joinC

    joinA >> process_stagesABC
    joinB >> process_stagesABC
    joinC >> process_stagesABC

    process_stagesABC >> cleanup

Best,
Alek
​

On Thu, Oct 12, 2017 at 1:43 AM, Daniel Lamblin [Data Science & Platform
Center] <lamblin@coupang.com> wrote:

> I hope this is an alright place to ask the following:
> In a case where some inputs will irregularly be missing, but where it's
> okay, I was reading
> https://airflow.incubator.apache.org/concepts.html#trigger-rules
> and I thought I needed `all_done` for a final task, but a skip is not a
> done state, nor does it (seem to) propagate.
> Is there a way to trigger something after all upstreams are either
> successful or skipped?
>
> My case looks a little like:
> sensor_dataA >> preprocess_and_stage_dataA >> process_stagesABC >>
clean_up
> sensor_dataB >> preprocess_and_stage_dataB >> process_stagesABC
> sensor_dataC >> preprocess_and_stage_dataC >> process_stagesABC
>
> I don't want the preprocess to fail because the data isn't there and there
> will be side-effects, but if the sensor skips its associated
> preprocess_and_stage is not queued. The task doesn't seem to have any state
> (like `upstream_skipped`?) so process_stagesABC won't be triggered by
> `all_done`. `one_success` seems like it would be prefect except that it
> would start before all preprocess tasks have been either run or skipped.
>
> Am I missing a way that this can be done? Is there some general guide to
> changing the DAG structure that would handle completing the process? Am I
> supposed to be using XCOM here?
>
> If all these answers are "no/maybe" then is there some opportunity to
> introduce an `upstream_skipped` state or a different `trigger_rule`... a
> kludgy `SkipAheadOperator`, or something?
>
> Thanks,
> -Daniel Lamblin
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message