airflow-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Avi Levi <a...@theneura.com>
Subject Re: Create dependencies between emr steps
Date Thu, 03 Jun 2021 19:09:44 GMT
Thanks Daniel.
I am pretty new with Airflow, your suggestions sound great but can you add
a reference to some implementation examples ?

Best
Avi

On Thu, Jun 3, 2021 at 11:46 AM Daniel Standish <dpstandish@gmail.com>
wrote:

> So do i have to go on option 1 ? meaning add step_checker after each step
>> to verify completion
>
>
> One thing you can do to make this less onerous is subclass
> EmrAddStepsOperator  and add a property that produces the sensor it needs.
>
> So you would have something like
>
> task1  = EmrAddStepsOperator(...)
> task1_sensor = task1.sensor_task
> task1 >> task1_sensor
>
> Another option you have is you can create EmrAddStepsSyncronousOperator,
> and do the waiting in the same task.
>
>
> On Thu, Jun 3, 2021 at 7:46 AM Avi Levi <avil@theneura.com> wrote:
>
>> Thanks Tom,
>> But this way there is no dependency between the steps right ? here you
>> are just verifying that they are both completed. However, I do want step2
>> to be dependent on step1 completion successfully .
>> So do i have to go on option 1 ? meaning add step_checker after each step
>> to verify completion
>> option 1:  start_pipeline >> create_emr_cluster >> step1 >>
>> step1_watcher >> step2 >> step2_warcher >> terminate
>>
>> option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2
>>
>> step2_warcher >> terminate
>>
>> On Thu, Jun 3, 2021 at 5:20 AM Tom Korrison <tom.korrison@concirrus.com>
>> wrote:
>>
>>> Hi,
>>>
>>>
>>>
>>> I only have one add_step task but a step_sensor for each step added.
>>>
>>>
>>>
>>> e.g.
>>>
>>>
>>>
>>> start_daily_pipeline = DummyOperator(
>>>     task_id="start_daily_pipeline",
>>>     dag=dag
>>> )
>>>
>>> cluster_creator = EmrCreateJobFlowOperator(
>>>     task_id="create_job_flow",
>>>     aws_conn_id="aws_role_default",
>>>     emr_conn_id="emr_default",
>>>     job_flow_overrides=JOB_FLOW_OVERRIDES,
>>>     dag=dag,
>>> )
>>>
>>> step_adder = EmrAddStepsOperator(
>>>     task_id="add_steps",
>>>     job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
>>> key='return_value') }}",
>>>     aws_conn_id="aws_role_default",
>>>     steps=SPARK_STEPS,
>>>     dag=dag,
>>> )
>>>
>>> step1_checker = EmrStepSensor(
>>>     task_id="watch_step_1",
>>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>>> key='return_value') }}",
>>>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
>>> key='return_value')[0] }}",
>>>     aws_conn_id="aws_role_default",
>>>     dag=dag,
>>> )
>>>
>>> step2_checker = EmrStepSensor(
>>>     task_id="watch_step_2",
>>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>>> key='return_value') }}",
>>>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
>>> key='return_value')[1] }}",
>>>     aws_conn_id="aws_role_default",
>>>     dag=dag,
>>> )
>>>
>>> job_flow_checker = EmrJobFlowSensor(
>>>     task_id="watch_job_flow",
>>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>>> key='return_value') }}",
>>>     step_id="{{ task_instance.xcom_pull('add_steps',
>>> key='return_value')[0] }}",
>>>     aws_conn_id="aws_role_default",
>>>     dag=dag,
>>> )
>>>
>>> cluster_remover = EmrTerminateJobFlowOperator(
>>>     task_id="remove_cluster",
>>>     job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
>>> key='return_value') }}",
>>>     aws_conn_id="aws_role_default",
>>>     dag=dag,
>>> )
>>>
>>>
>>> start_daily_pipeline >> cluster_creator >> step_adder
>>> step_adder >> [step1_checker, step2_checker] >> job_flow_checker
>>
>>> cluster_remover
>>>
>>>
>>>
>>>
>>>
>>> *From:* Avi Levi <avil@theneura.com>
>>> *Sent:* 03 June 2021 08:21
>>> *To:* users@airflow.apache.org
>>> *Subject:* Create dependencies between emr steps
>>>
>>>
>>>
>>> Hi,
>>>
>>> How can I create dependencies between emr steps ? Do I need to create a
>>> step watcher between each one like below (option 1) or I don't need the
>>> step_watcher and they can be dependent directly (option 2) ? meaning
>>> something like this:
>>>
>>>
>>>
>>> step1 = EmrAddStepsOperator(
>>> task_id="step1",
>>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
>>> key='return_value') }}",
>>> aws_conn_id="aws_default",
>>> steps=STEP1,
>>> dag=dag,
>>> )
>>>
>>>
>>>
>>> step2 = EmrAddStepsOperator(
>>> task_id="step2",
>>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
>>> key='return_value') }}",
>>> aws_conn_id="aws_default",
>>> steps=STEP2,
>>> dag=dag,
>>> )
>>>
>>>
>>>
>>> step1_watcher = EmrStepSensor(
>>> task_id="step_1watcher",
>>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
>>> key='return_value') }}",
>>> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0]
>>> }}",
>>> aws_conn_id="aws_default",
>>> dag=dag,
>>> )
>>>
>>>
>>>
>>> step2_watcher = EmrStepSensor(
>>> task_id="step_2watcher",
>>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
>>> key='return_value') }}",
>>> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0]
>>> }}",
>>> aws_conn_id="aws_default",
>>> dag=dag,
>>> )
>>>
>>>
>>>
>>> option 1:  start_pipeline >> create_emr_cluster >> step1 >>
>>> step1_watcher >> step2 >> step2_warcher >> terminate
>>>
>>>
>>>
>>> option 2:  start_pipeline >> create_emr_cluster >> step1 >>
step2 >>
>>> step2_warcher >> terminate
>>>
>>>
>>>
>>

Mime
View raw message