From users-return-593-archive-asf-public=cust-asf.ponee.io@airflow.apache.org Thu Jun 3 19:10:08 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 8FA15180643 for ; Thu, 3 Jun 2021 21:10:08 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id CF22861CB0 for ; Thu, 3 Jun 2021 19:10:05 +0000 (UTC) Received: (qmail 76494 invoked by uid 500); 3 Jun 2021 19:10:05 -0000 Mailing-List: contact users-help@airflow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@airflow.apache.org Delivered-To: mailing list users@airflow.apache.org Received: (qmail 76482 invoked by uid 99); 3 Jun 2021 19:10:04 -0000 Received: from spamproc1-he-de.apache.org (HELO spamproc1-he-de.apache.org) (116.203.196.100) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Jun 2021 19:10:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamproc1-he-de.apache.org (ASF Mail Server at spamproc1-he-de.apache.org) with ESMTP id 1AB781FF3A1 for ; Thu, 3 Jun 2021 19:10:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org X-Spam-Flag: NO X-Spam-Score: 0.199 X-Spam-Level: X-Spam-Status: No, score=0.199 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_FONT_LOW_CONTRAST=0.001, HTML_MESSAGE=0.2, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamproc1-he-de.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=theneura-com.20150623.gappssmtp.com Received: from mx1-ec2-va.apache.org ([116.203.227.195]) by localhost (spamproc1-he-de.apache.org [116.203.196.100]) (amavisd-new, port 10024) with ESMTP id NdCsxHvvWH3h for ; Thu, 3 Jun 2021 19:10:03 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.166.177; helo=mail-il1-f177.google.com; envelope-from=avil@theneura.com; receiver= Received: from mail-il1-f177.google.com (mail-il1-f177.google.com [209.85.166.177]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 97311BD232 for ; Thu, 3 Jun 2021 19:10:02 +0000 (UTC) Received: by mail-il1-f177.google.com with SMTP id d1so3574551ils.5 for ; Thu, 03 Jun 2021 12:10:02 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=theneura-com.20150623.gappssmtp.com; s=20150623; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=wODt2eQkZIQ3saPeHn8RPOSd2SM0vSVdec53oPT/0BU=; b=BYFt9NUN2WqNHfU81VICPPaGVfiO4nJy+39AfNM/40As7XQ9R1/HlTHX7edbbU3GZM TK4Rg3YkINhMoZdVw/G2TubLcciqm1uNbGvi+CgXrsIBX30gSLHsDE1+YdDMM2mo6Zmv kU5ychRrNKjKRTjwdeR37V1sYWyYB4/T9xMD82hUgzQxCs+lCRLUcyW6F5xeeFH8AOFJ s3niQMCU/dTqTi1UYu/jzgYej2WhdulcoVfis/3O5RTkI77K6uPe9aSG5w/tgJcnYSx+ 7d42FS5HVL0A+TWMkjhUksdFk1Fff4viCpy3tObACD7hR055AWO5zUmOstIeDXrNws8R 69ew== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=wODt2eQkZIQ3saPeHn8RPOSd2SM0vSVdec53oPT/0BU=; b=PUhPXWV0rohvoF5FOaYwDUyRXU7sr6oQG7QnQOWQQMh34GCYJTzLfDLX2+nZBX3x2Q 2Ke0QsZHet+PsdefBvDlqPBPxkLfvZpwLPWGXlUNFla+YeGNc19bwUIrctSTsCKthGAD bdE6OUijGK/Eg5zA9KvjREC+LSTHMRBa8Iu5DRUc2YTb2Dza/FphRwTc4sHWKVKPPmmq dfdIxxmTQ2IaZ8dU9oLsuEVsC0aEa/ftIMRIFmnSE8pvzJyenUFRus5b1L2XsPXfhK7l 8kaWBOZH2ZC0OkgSoLVBh/bIwHH4FqesJACKJeenbXa9UJhB1GTAHYrHH9I06HEkrqa5 iKnQ== X-Gm-Message-State: AOAM533ZXbu6PxinUZWTqhoEc0Kelx2Mz9Sx5+4IzWqeO1gN9+RJHS6y /BPUvk3UoLybDw2B3m/tdbceADyO5z0lG9SeYnlEuk6FOeAV X-Google-Smtp-Source: ABdhPJx2H2QeVP+vU2i5eUbTADFsClTBUOLr+Iu83fkjtpIP8atdEOZCPiJDGuTIgeGhRpg/UURsj/xci6nJPGw5/uQ= X-Received: by 2002:a92:6804:: with SMTP id d4mr711620ilc.5.1622747395857; Thu, 03 Jun 2021 12:09:55 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Avi Levi Date: Thu, 3 Jun 2021 14:09:44 -0500 Message-ID: Subject: Re: Create dependencies between emr steps To: users@airflow.apache.org Content-Type: multipart/alternative; boundary="0000000000001734f405c3e1527b" --0000000000001734f405c3e1527b Content-Type: text/plain; charset="UTF-8" Thanks Daniel. I am pretty new with Airflow, your suggestions sound great but can you add a reference to some implementation examples ? Best Avi On Thu, Jun 3, 2021 at 11:46 AM Daniel Standish wrote: > So do i have to go on option 1 ? meaning add step_checker after each step >> to verify completion > > > One thing you can do to make this less onerous is subclass > EmrAddStepsOperator and add a property that produces the sensor it needs. > > So you would have something like > > task1 = EmrAddStepsOperator(...) > task1_sensor = task1.sensor_task > task1 >> task1_sensor > > Another option you have is you can create EmrAddStepsSyncronousOperator, > and do the waiting in the same task. > > > On Thu, Jun 3, 2021 at 7:46 AM Avi Levi wrote: > >> Thanks Tom, >> But this way there is no dependency between the steps right ? here you >> are just verifying that they are both completed. However, I do want step2 >> to be dependent on step1 completion successfully . >> So do i have to go on option 1 ? meaning add step_checker after each step >> to verify completion >> option 1: start_pipeline >> create_emr_cluster >> step1 >> >> step1_watcher >> step2 >> step2_warcher >> terminate >> >> option 2: start_pipeline >> create_emr_cluster >> step1 >> step2 >> >> step2_warcher >> terminate >> >> On Thu, Jun 3, 2021 at 5:20 AM Tom Korrison >> wrote: >> >>> Hi, >>> >>> >>> >>> I only have one add_step task but a step_sensor for each step added. >>> >>> >>> >>> e.g. >>> >>> >>> >>> start_daily_pipeline = DummyOperator( >>> task_id="start_daily_pipeline", >>> dag=dag >>> ) >>> >>> cluster_creator = EmrCreateJobFlowOperator( >>> task_id="create_job_flow", >>> aws_conn_id="aws_role_default", >>> emr_conn_id="emr_default", >>> job_flow_overrides=JOB_FLOW_OVERRIDES, >>> dag=dag, >>> ) >>> >>> step_adder = EmrAddStepsOperator( >>> task_id="add_steps", >>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', >>> key='return_value') }}", >>> aws_conn_id="aws_role_default", >>> steps=SPARK_STEPS, >>> dag=dag, >>> ) >>> >>> step1_checker = EmrStepSensor( >>> task_id="watch_step_1", >>> job_flow_id="{{ task_instance.xcom_pull('create_job_flow', >>> key='return_value') }}", >>> step_id="{{ task_instance.xcom_pull(task_ids='add_steps', >>> key='return_value')[0] }}", >>> aws_conn_id="aws_role_default", >>> dag=dag, >>> ) >>> >>> step2_checker = EmrStepSensor( >>> task_id="watch_step_2", >>> job_flow_id="{{ task_instance.xcom_pull('create_job_flow', >>> key='return_value') }}", >>> step_id="{{ task_instance.xcom_pull(task_ids='add_steps', >>> key='return_value')[1] }}", >>> aws_conn_id="aws_role_default", >>> dag=dag, >>> ) >>> >>> job_flow_checker = EmrJobFlowSensor( >>> task_id="watch_job_flow", >>> job_flow_id="{{ task_instance.xcom_pull('create_job_flow', >>> key='return_value') }}", >>> step_id="{{ task_instance.xcom_pull('add_steps', >>> key='return_value')[0] }}", >>> aws_conn_id="aws_role_default", >>> dag=dag, >>> ) >>> >>> cluster_remover = EmrTerminateJobFlowOperator( >>> task_id="remove_cluster", >>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', >>> key='return_value') }}", >>> aws_conn_id="aws_role_default", >>> dag=dag, >>> ) >>> >>> >>> start_daily_pipeline >> cluster_creator >> step_adder >>> step_adder >> [step1_checker, step2_checker] >> job_flow_checker >> >>> cluster_remover >>> >>> >>> >>> >>> >>> *From:* Avi Levi >>> *Sent:* 03 June 2021 08:21 >>> *To:* users@airflow.apache.org >>> *Subject:* Create dependencies between emr steps >>> >>> >>> >>> Hi, >>> >>> How can I create dependencies between emr steps ? Do I need to create a >>> step watcher between each one like below (option 1) or I don't need the >>> step_watcher and they can be dependent directly (option 2) ? meaning >>> something like this: >>> >>> >>> >>> step1 = EmrAddStepsOperator( >>> task_id="step1", >>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', >>> key='return_value') }}", >>> aws_conn_id="aws_default", >>> steps=STEP1, >>> dag=dag, >>> ) >>> >>> >>> >>> step2 = EmrAddStepsOperator( >>> task_id="step2", >>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', >>> key='return_value') }}", >>> aws_conn_id="aws_default", >>> steps=STEP2, >>> dag=dag, >>> ) >>> >>> >>> >>> step1_watcher = EmrStepSensor( >>> task_id="step_1watcher", >>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', >>> key='return_value') }}", >>> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] >>> }}", >>> aws_conn_id="aws_default", >>> dag=dag, >>> ) >>> >>> >>> >>> step2_watcher = EmrStepSensor( >>> task_id="step_2watcher", >>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', >>> key='return_value') }}", >>> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] >>> }}", >>> aws_conn_id="aws_default", >>> dag=dag, >>> ) >>> >>> >>> >>> option 1: start_pipeline >> create_emr_cluster >> step1 >> >>> step1_watcher >> step2 >> step2_warcher >> terminate >>> >>> >>> >>> option 2: start_pipeline >> create_emr_cluster >> step1 >> step2 >> >>> step2_warcher >> terminate >>> >>> >>> >> --0000000000001734f405c3e1527b Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Thanks Daniel.
I am pretty new with Airflow, your sugg= estions sound great but can you add a reference to some implementation exam= ples ?

Best
Avi

On Thu, Jun 3, 2021= at 11:46 AM Daniel Standish <dp= standish@gmail.com> wrote:
So do i have to go on option 1 ? meaning add step_checker after = each step to verify completion

One thing you can do to make= this less onerous is subclass EmrAddStepsOperator =C2=A0and add a property= that produces the sensor it needs.

So you would have something= like

task1=C2=A0 =3D EmrAddStepsOperator(...)
task1_sensor =3D task1.sensor_task=C2=A0
task1 >> ta= sk1_sensor

Another option you have is you can crea= te EmrAddStepsSyncronousOperator, and do the waiting in the same task.


On Thu, Jun 3, 2021 at 7:46 AM Avi Levi <avil@theneura.com> wrote:
T= hanks Tom,
But this way there is no dependency between the steps right = ? here you are just verifying that they are both completed. However, I do w= ant step2 to be dependent on step1 completion successfully=C2=A0.=C2=A0
So do i have to go on option 1 ? meaning add step_checker after each= step to verify completion=C2=A0
option 1:=C2=A0 start_pip= eline >> create_emr_cluster >> step1 >> step1_watcher >= ;> step2 >> step2_warcher >> terminate=C2=A0

=
option 2:=C2=A0 start_= pipeline >> create_emr_cluster >> step1 >> step2 >>= step2_warcher >> terminate=C2=A0

<= div class=3D"gmail_quote">
On Thu, Jun= 3, 2021 at 5:20 AM Tom Korrison <tom.korrison@concirrus.com> wrote:

Hi,

=C2=A0

I only have one add_step task but a step_senso= r for each step added.

=C2=A0

e.g.

=C2=A0

st= art_daily_pipeline =3D DummyOperator(
=C2=A0=C2=A0=C2=A0
task_id=3D"start_daily_pipeline", =C2=A0=C2=A0=C2=A0 dag=3Ddag
)

cluster_creator
=3D EmrCreateJobFlowOperator(
=C2=A0=C2=A0=C2=A0
task_id=3D"create_job_flow",
=C2=A0=C2=A0=C2=A0
aws_conn_id=3D<= /span>"aws_role_default", =C2=A0=C2=A0=C2=A0 emr_conn_id=3D<= /span>"emr_default",
=C2=A0=C2=A0=C2=A0
job_flow_overrides=3DJOB_FLOW_OVERRIDES,
=C2=A0=C2=A0=C2=A0
dag=3Ddag,
)

step_adder
=3D EmrAddStepsOperator(
=C2=A0=C2=A0=C2=A0
task_id=3D"add_steps",
=C2=A0=C2=A0=C2=A0
job_flow_id=3D<= /span>"{{ task_instance.xcom_pull(task_ids=3D'creat= e_job_flow', key=3D'return_value') }}",
=C2=A0=C2=A0=C2=A0
aws_conn_id=3D<= /span>"aws_role_default", =C2=A0=C2=A0=C2=A0 steps=3D= SPARK_STEPS,
=C2=A0=C2=A0=C2=A0
dag=3Ddag,
)

step1_checker
=3D EmrStepSensor(
=C2=A0=C2=A0=C2=A0
task_id=3D"watch_step_1",
=C2=A0=C2=A0=C2=A0
job_flow_id=3D<= /span>"{{ task_instance.xcom_pull('create_job_flow&= #39;, key=3D'return_value') }}",
=C2=A0=C2=A0=C2=A0
step_id=3D"{{ task_instance.xcom_pull(task_ids=3D'add_steps= ', key=3D'return_value')[0] }}",
=C2=A0=C2=A0=C2=A0
aws_conn_id=3D<= /span>"aws_role_default", =C2=A0=C2=A0=C2=A0 dag=3Ddag,
)

step2_checker
=3D EmrStepSensor(
=C2=A0=C2=A0=C2=A0
task_id=3D"watch_step_2",
=C2=A0=C2=A0=C2=A0
job_flow_id=3D<= /span>"{{ task_instance.xcom_pull('create_job_flow&= #39;, key=3D'return_value') }}",
=C2=A0=C2=A0=C2=A0
step_id=3D"{{ task_instance.xcom_pull(task_ids=3D'add_steps= ', key=3D'return_value')[1] }}",
=C2=A0=C2=A0=C2=A0
aws_conn_id=3D<= /span>"aws_role_default", =C2=A0=C2=A0=C2=A0 dag=3Ddag,
)

job_flow_checker
=3D EmrJobFlowSensor(
=C2=A0=C2=A0=C2=A0
task_id=3D"watch_job_flow",
=C2=A0=C2=A0=C2=A0
job_flow_id=3D<= /span>"{{ task_instance.xcom_pull('create_job_flow&= #39;, key=3D'return_value') }}",
=C2=A0=C2=A0=C2=A0
step_id=3D"{{ task_instance.xcom_pull('add_steps', key=3D'return_value')[0] }}",
=C2=A0=C2=A0=C2=A0
aws_conn_id=3D<= /span>"aws_role_default", =C2=A0=C2=A0=C2=A0 dag=3Ddag,
)

cluster_remover
=3D EmrTerminateJobFlowOperator( =C2=A0=C2=A0=C2=A0 task_id=3D"remove_cluster",
=C2=A0=C2=A0=C2=A0
job_flow_id=3D<= /span>"{{ task_instance.xcom_pull(task_ids=3D'creat= e_job_flow', key=3D'return_value') }}",
=C2=A0=C2=A0=C2=A0
aws_conn_id=3D<= /span>"aws_role_default", =C2=A0=C2=A0=C2=A0 dag=3Ddag,
)


start_daily_pipeline
>> cluster_creator >> step_adder
step_adder
>> [step1_checker, step2_checker] >> job_flow_checker >> cluster_remover

=C2=A0

=C2=A0

From: Avi Levi <avil@theneura.com>
Sent: 03 June 2021 08:21
To: us= ers@airflow.apache.org
Subject: Create dependencies between emr steps
<= /p>

=C2=A0

Hi,=C2=A0

How can I create dependencies between emr steps ? Do= I need to create a step watcher between each one like below (option 1) or = I don't need the step_watcher and they can be dependent=C2=A0directly (= option 2) ? meaning something like this:

=C2=A0

step1 =3D EmrAddStepsOperator(
task_id=3D"step1",
job_flow_id=3D"{{ task_instance.xcom_pull(task_ids=3D= 9;create_emr_cluster', key=3D'return_value') }}",
aws_conn_id=3D"aws_default",
steps=3DSTEP1,
dag=3Ddag,
)

=C2=A0

step2 =3D EmrAddStepsOperator(
task_id=3D"step2",
job_flow_id=3D"{{ task_instance.xcom_pull(task_ids=3D= 9;create_emr_cluster', key=3D'return_value') }}",
aws_conn_id=3D"aws_default",
steps=3DSTEP2,
dag=3Ddag,
)

=C2=A0

step1_watcher =3D EmrStepSensor(
task_id=3D"step_1watcher",
job_flow_id=3D"{{ task_instance.xcom_pull('create_em= r_cluster', key=3D'return_value') }}",
step_id=3D"{{ task_instance.xcom_pull('add_steps'= ;, key=3D'return_value')[0] }}",
aws_conn_id=3D"aws_default",
dag=3Ddag,
)

=C2=A0

step2_watcher =3D EmrStepSensor(
task_id=3D"step_2watcher",
job_flow_id=3D"{{ task_instance.xcom_pull('create_em= r_cluster', key=3D'return_value') }}",
step_id=3D"{{ task_instance.xcom_pull('add_steps'= ;, key=3D'return_value')[0] }}",
aws_conn_id=3D"aws_default",
dag=3Ddag,
)

=C2=A0

option = 1:=C2=A0 start_pipelin= e >> create_emr_cluster >> step1 >> step1_watcher >>= ; step2 >> step2_warcher >> terminate=C2=A0

=C2=A0

option 2:=C2=A0 start_pipeline >> create_emr_cluster >> step1 >>= ; step2 >> step2_warcher >> terminate=C2=A0

=C2=A0

--0000000000001734f405c3e1527b--