airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From MSR M <msrmaill...@gmail.com>
Subject Creating and accessing different variable values for each instance of DAG run
Date Tue, 12 Jul 2016 00:43:50 GMT
Hi,

I need some advice in solving a problem with local variables in DAG.

I have a DAG < schedule intervel 30 mins >. It has 3 tasks. t1 runs a
python program on remote EC2. t2 waits for S3 file availability at
particular location. This S3 file created by t1. Once the S3 file is
available, t3 runs and process the file on S3.

I have date-time as part of my S3 file location.

dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M')

bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess"

t1 runs more than 1 hour so second instance of DAG  is already started and
it changes the variable dttm2 value so job1 task # t2 is trying to locate
the file at different location.

To overcome this I am planning to use parameter {{execution_date}} instead
of getting dttm2 value as shown above.

In situations like these, is there any better approach to keep same value
in a variable through out the particular run of DAG?

Or use XCOM feature to push and pull the values across the tasks with
different keys for each run?


part of my dag is given below:

#

dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M')


NL = """
          cd /home/ubuntu/Scripts/ ; python2 a11.py {{params.get("dttm2")}}
;
"""

t1 = BashOperator(
    task_id='E_Ns_A',
    bash_command=NL,
    params={'dttm2':dttm2},
    retries=3,
    dag=dag)

bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess"

def detect_s3(name, dag=dag, upstream=t1):
  task = S3KeySensor(
    task_id = name,
    bucket_key=bucket_key2,
    s3_conn_id='s3conn',
    dag=dag,
    wildcard_match=True)
  task.set_upstream(upstream)
  return task

# Spark Module to clasiify data

bucket_key3 = "s3://aaaaa/bbbbb/" + dttm2 + "/"
sparkcmd = """
           cd /home/ubuntu/SC; /home/ubuntu/anaconda3/bin/python
 NbRunner.py;
           aws s3 cp /home/ubuntu/NC.txt {{params.get("bkey")}} --region
us-west-1
"""

t3 = BashOperator(
    task_id='CNs',
    bash_command=sparkcmd,
    params={"bkey":bucket_key3},
    retries=1,
    dag=dag)

t2 = detect_s3('t2')

t3.set_upstream(t2)


Thanks,
MSR

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message