airflow-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tom Korrison <tom.korri...@concirrus.com>
Subject RE: Create dependencies between emr steps
Date Thu, 03 Jun 2021 10:19:59 GMT
Hi,

I only have one add_step task but a step_sensor for each step added.

e.g.

start_daily_pipeline = DummyOperator(
    task_id="start_daily_pipeline",
    dag=dag
)

cluster_creator = EmrCreateJobFlowOperator(
    task_id="create_job_flow",
    aws_conn_id="aws_role_default",
    emr_conn_id="emr_default",
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    dag=dag,
)

step_adder = EmrAddStepsOperator(
    task_id="add_steps",
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value')
}}",
    aws_conn_id="aws_role_default",
    steps=SPARK_STEPS,
    dag=dag,
)

step1_checker = EmrStepSensor(
    task_id="watch_step_1",
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
    aws_conn_id="aws_role_default",
    dag=dag,
)

step2_checker = EmrStepSensor(
    task_id="watch_step_2",
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[1] }}",
    aws_conn_id="aws_role_default",
    dag=dag,
)

job_flow_checker = EmrJobFlowSensor(
    task_id="watch_job_flow",
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id="aws_role_default",
    dag=dag,
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id="remove_cluster",
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value')
}}",
    aws_conn_id="aws_role_default",
    dag=dag,
)


start_daily_pipeline >> cluster_creator >> step_adder
step_adder >> [step1_checker, step2_checker] >> job_flow_checker >> cluster_remover


From: Avi Levi <avil@theneura.com>
Sent: 03 June 2021 08:21
To: users@airflow.apache.org
Subject: Create dependencies between emr steps

Hi,
How can I create dependencies between emr steps ? Do I need to create a step watcher between
each one like below (option 1) or I don't need the step_watcher and they can be dependent
directly (option 2) ? meaning something like this:

step1 = EmrAddStepsOperator(
task_id="step1",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value')
}}",
aws_conn_id="aws_default",
steps=STEP1,
dag=dag,
)

step2 = EmrAddStepsOperator(
task_id="step2",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value')
}}",
aws_conn_id="aws_default",
steps=STEP2,
dag=dag,
)

step1_watcher = EmrStepSensor(
task_id="step_1watcher",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id="aws_default",
dag=dag,
)

step2_watcher = EmrStepSensor(
task_id="step_2watcher",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id="aws_default",
dag=dag,
)

option 1:  start_pipeline >> create_emr_cluster >> step1 >> step1_watcher
>> step2 >> step2_warcher >> terminate

option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
step2_warcher >> terminate

Mime
View raw message