airflow-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel Standish <dpstand...@gmail.com>
Subject Re: Create dependencies between emr steps
Date Thu, 03 Jun 2021 20:14:41 GMT
Well for the first one something like this:

from airflow.providers.amazon.aws.operators.emr_add_steps import
EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor

STEP1 = None
dag = None


class MyEmrAddStepsOperator(EmrAddStepsOperator):
    def __init__(self, create_cluster_task_id, **kwargs):
        self.create_cluster_task_id = create_cluster_task_id
        super().__init__(**kwargs)

    @property
    def step_sensor_task(self):
        return EmrStepSensor(
            task_id=f"{self.task_id}.sensor",
            job_flow_id=f"{{{{
task_instance.xcom_pull({self.create_cluster_task_id},
key='return_value') }}}}",
            step_id=f"{{{{ task_instance.xcom_pull({self.task_id},
key='return_value')[0] }}}}",
            aws_conn_id=self.aws_conn_id,
            dag=dag,
        )


step_1_task = MyEmrAddStepsOperator(
    task_id="step1",
    job_flow_id="{{
task_instance.xcom_pull(task_ids='create_emr_cluster',
key='return_value') }}",
    aws_conn_id="aws_default",
    steps=STEP1,
    dag=dag,
    create_cluster_task_id='create_cluster',
)

step_1_sensor = step_1_task.step_sensor_task

step_1_task >> step_1_sensor

you could even contain them within a task group (google airflow task
groups) to make it look like one task in the UI.  that's _mostly_ cosmetic
but with task groups you can set dependencies between them which in your
case might be convenient.

on the topic of combining, one way you could do it is copy the polling /
waiting logic from the sensor and stick it in a subclass like this:


class MyOtherEmrAddStepsOperator(EmrAddStepsOperator):
    def wait_step(self):
        """put wait logic here"""

    def execute(self, context):
        super().execute(context)
        self.wait_step()













On Thu, Jun 3, 2021 at 12:10 PM Avi Levi <avil@theneura.com> wrote:

> Thanks Daniel.
> I am pretty new with Airflow, your suggestions sound great but can you add
> a reference to some implementation examples ?
>
> Best
> Avi
>
> On Thu, Jun 3, 2021 at 11:46 AM Daniel Standish <dpstandish@gmail.com>
> wrote:
>
>> So do i have to go on option 1 ? meaning add step_checker after each step
>>> to verify completion
>>
>>
>> One thing you can do to make this less onerous is subclass
>> EmrAddStepsOperator  and add a property that produces the sensor it needs.
>>
>> So you would have something like
>>
>> task1  = EmrAddStepsOperator(...)
>> task1_sensor = task1.sensor_task
>> task1 >> task1_sensor
>>
>> Another option you have is you can create EmrAddStepsSyncronousOperator,
>> and do the waiting in the same task.
>>
>>
>> On Thu, Jun 3, 2021 at 7:46 AM Avi Levi <avil@theneura.com> wrote:
>>
>>> Thanks Tom,
>>> But this way there is no dependency between the steps right ? here you
>>> are just verifying that they are both completed. However, I do want step2
>>> to be dependent on step1 completion successfully .
>>> So do i have to go on option 1 ? meaning add step_checker after each
>>> step to verify completion
>>> option 1:  start_pipeline >> create_emr_cluster >> step1 >>
>>> step1_watcher >> step2 >> step2_warcher >> terminate
>>>
>>> option 2:  start_pipeline >> create_emr_cluster >> step1 >>
step2 >>
>>> step2_warcher >> terminate
>>>
>>> On Thu, Jun 3, 2021 at 5:20 AM Tom Korrison <tom.korrison@concirrus.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> I only have one add_step task but a step_sensor for each step added.
>>>>
>>>>
>>>>
>>>> e.g.
>>>>
>>>>
>>>>
>>>> start_daily_pipeline = DummyOperator(
>>>>     task_id="start_daily_pipeline",
>>>>     dag=dag
>>>> )
>>>>
>>>> cluster_creator = EmrCreateJobFlowOperator(
>>>>     task_id="create_job_flow",
>>>>     aws_conn_id="aws_role_default",
>>>>     emr_conn_id="emr_default",
>>>>     job_flow_overrides=JOB_FLOW_OVERRIDES,
>>>>     dag=dag,
>>>> )
>>>>
>>>> step_adder = EmrAddStepsOperator(
>>>>     task_id="add_steps",
>>>>     job_flow_id="{{
>>>> task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}"
>>>> ,
>>>>     aws_conn_id="aws_role_default",
>>>>     steps=SPARK_STEPS,
>>>>     dag=dag,
>>>> )
>>>>
>>>> step1_checker = EmrStepSensor(
>>>>     task_id="watch_step_1",
>>>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>>>> key='return_value') }}",
>>>>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
>>>> key='return_value')[0] }}",
>>>>     aws_conn_id="aws_role_default",
>>>>     dag=dag,
>>>> )
>>>>
>>>> step2_checker = EmrStepSensor(
>>>>     task_id="watch_step_2",
>>>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>>>> key='return_value') }}",
>>>>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
>>>> key='return_value')[1] }}",
>>>>     aws_conn_id="aws_role_default",
>>>>     dag=dag,
>>>> )
>>>>
>>>> job_flow_checker = EmrJobFlowSensor(
>>>>     task_id="watch_job_flow",
>>>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>>>> key='return_value') }}",
>>>>     step_id="{{ task_instance.xcom_pull('add_steps',
>>>> key='return_value')[0] }}",
>>>>     aws_conn_id="aws_role_default",
>>>>     dag=dag,
>>>> )
>>>>
>>>> cluster_remover = EmrTerminateJobFlowOperator(
>>>>     task_id="remove_cluster",
>>>>     job_flow_id="{{
>>>> task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}"
>>>> ,
>>>>     aws_conn_id="aws_role_default",
>>>>     dag=dag,
>>>> )
>>>>
>>>>
>>>> start_daily_pipeline >> cluster_creator >> step_adder
>>>> step_adder >> [step1_checker, step2_checker] >> job_flow_checker
>>
>>>> cluster_remover
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From:* Avi Levi <avil@theneura.com>
>>>> *Sent:* 03 June 2021 08:21
>>>> *To:* users@airflow.apache.org
>>>> *Subject:* Create dependencies between emr steps
>>>>
>>>>
>>>>
>>>> Hi,
>>>>
>>>> How can I create dependencies between emr steps ? Do I need to create a
>>>> step watcher between each one like below (option 1) or I don't need the
>>>> step_watcher and they can be dependent directly (option 2) ? meaning
>>>> something like this:
>>>>
>>>>
>>>>
>>>> step1 = EmrAddStepsOperator(
>>>> task_id="step1",
>>>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
>>>> key='return_value') }}",
>>>> aws_conn_id="aws_default",
>>>> steps=STEP1,
>>>> dag=dag,
>>>> )
>>>>
>>>>
>>>>
>>>> step2 = EmrAddStepsOperator(
>>>> task_id="step2",
>>>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
>>>> key='return_value') }}",
>>>> aws_conn_id="aws_default",
>>>> steps=STEP2,
>>>> dag=dag,
>>>> )
>>>>
>>>>
>>>>
>>>> step1_watcher = EmrStepSensor(
>>>> task_id="step_1watcher",
>>>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
>>>> key='return_value') }}",
>>>> step_id="{{ task_instance.xcom_pull('add_steps',
>>>> key='return_value')[0] }}",
>>>> aws_conn_id="aws_default",
>>>> dag=dag,
>>>> )
>>>>
>>>>
>>>>
>>>> step2_watcher = EmrStepSensor(
>>>> task_id="step_2watcher",
>>>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
>>>> key='return_value') }}",
>>>> step_id="{{ task_instance.xcom_pull('add_steps',
>>>> key='return_value')[0] }}",
>>>> aws_conn_id="aws_default",
>>>> dag=dag,
>>>> )
>>>>
>>>>
>>>>
>>>> option 1:  start_pipeline >> create_emr_cluster >> step1 >>
>>>> step1_watcher >> step2 >> step2_warcher >> terminate
>>>>
>>>>
>>>>
>>>> option 2:  start_pipeline >> create_emr_cluster >> step1 >>
step2 >>
>>>> step2_warcher >> terminate
>>>>
>>>>
>>>>
>>>

Mime
View raw message