airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z...@vo.yoo.ro
Subject Help reusing filepath from previous task
Date Fri, 08 Jul 2016 17:58:39 GMT
Hello.

I'd like to reuse some info given by a task to feed subsequent S3KeySensor 
checks, but after trying several ideas I'm not able to make this work.

Basically my current DAG is as follow :

copy_data = BashOperator(
     task_id='copy_data',
    # this script copies data to S3
     bash_command='do_stuff.sh',
     dag=dag)

s3_path = xxxx
for data_type in ("fileA", "fileB", "fileC"):
     S3Sensor_task = S3KeySensor(
         task_id='check_' + data_type,
         poke_interval=20,
         timeout=60,
         retry_delay=timedelta(seconds=30),
         bucket_key=s3_path +  data_type,
         bucket_name='xxxx',
         s3_conn_id='s3_conn_id',
         dag=dag)
     S3Sensor_task.set_upstream(copy_data)

This works great, but I'd rather avoid duplicating the S3 path in the DAG 
and in the do_stuff.sh script.
So I tought pushing S3 path from the do_stuff.sh to xcom variables, but 
then I need access to thoses items in my for loop.
I've tried SubDag but once again I did not manage to read xcom.

Could someone give me some hints ?

Regards

Mime
View raw message