airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maxime Beauchemin <maximebeauche...@gmail.com>
Subject Re: Help reusing filepath from previous task
Date Mon, 11 Jul 2016 03:16:08 GMT
You could:

copy_data = BashOperator(
    task_id='copy_data',
    bash_command='do_stuff.sh',
    params={'s3_path': s3_path}
    dag=dag)

then reference {{ params.s3_path }} as a templated element in do_stuff.sh

Max

On Fri, Jul 8, 2016 at 1:58 PM, <ziqr@vo.yoo.ro> wrote:

> Hello.
>
> I'd like to reuse some info given by a task to feed subsequent S3KeySensor
> checks, but after trying several ideas I'm not able to make this work.
>
> Basically my current DAG is as follow :
>
> copy_data = BashOperator(
>     task_id='copy_data',
>    # this script copies data to S3
>     bash_command='do_stuff.sh',
>     dag=dag)
>
> s3_path = xxxx
> for data_type in ("fileA", "fileB", "fileC"):
>     S3Sensor_task = S3KeySensor(
>         task_id='check_' + data_type,
>         poke_interval=20,
>         timeout=60,
>         retry_delay=timedelta(seconds=30),
>         bucket_key=s3_path +  data_type,
>         bucket_name='xxxx',
>         s3_conn_id='s3_conn_id',
>         dag=dag)
>     S3Sensor_task.set_upstream(copy_data)
>
> This works great, but I'd rather avoid duplicating the S3 path in the DAG
> and in the do_stuff.sh script.
> So I tought pushing S3 path from the do_stuff.sh to xcom variables, but
> then I need access to thoses items in my for loop.
> I've tried SubDag but once again I did not manage to read xcom.
>
> Could someone give me some hints ?
>
> Regards
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message