airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lance Norskog <lance.nors...@gmail.com>
Subject Re: Creating and accessing different variable values for each instance of DAG run
Date Wed, 13 Jul 2016 23:25:31 GMT
Another way to do this is for T3 to do an S3 directory listing and find the
latest directories that need processing. When it finishes processing a
directory from t1, it deletes it. t1 should add a final file to a directory
that means "I'm finished, take this directory".

This is a very fail-safe approach. It changes the model from a pipeline:
  t1[time 1] supplies data to -> t3[time 1]
to:
  t1[time 1], t1[time 2], ... t1[time n] all supply data to -> t3[time n]

On Mon, Jul 11, 2016 at 5:43 PM, MSR M <msrmaillist@gmail.com> wrote:

> Hi,
>
> I need some advice in solving a problem with local variables in DAG.
>
> I have a DAG < schedule intervel 30 mins >. It has 3 tasks. t1 runs a
> python program on remote EC2. t2 waits for S3 file availability at
> particular location. This S3 file created by t1. Once the S3 file is
> available, t3 runs and process the file on S3.
>
> I have date-time as part of my S3 file location.
>
> dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M')
>
> bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess"
>
> t1 runs more than 1 hour so second instance of DAG  is already started and
> it changes the variable dttm2 value so job1 task # t2 is trying to locate
> the file at different location.
>
> To overcome this I am planning to use parameter {{execution_date}} instead
> of getting dttm2 value as shown above.
>
> In situations like these, is there any better approach to keep same value
> in a variable through out the particular run of DAG?
>
> Or use XCOM feature to push and pull the values across the tasks with
> different keys for each run?
>
>
> part of my dag is given below:
>
> #
>
> dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M')
>
>
> NL = """
>           cd /home/ubuntu/Scripts/ ; python2 a11.py {{params.get("dttm2")}}
> ;
> """
>
> t1 = BashOperator(
>     task_id='E_Ns_A',
>     bash_command=NL,
>     params={'dttm2':dttm2},
>     retries=3,
>     dag=dag)
>
> bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess"
>
> def detect_s3(name, dag=dag, upstream=t1):
>   task = S3KeySensor(
>     task_id = name,
>     bucket_key=bucket_key2,
>     s3_conn_id='s3conn',
>     dag=dag,
>     wildcard_match=True)
>   task.set_upstream(upstream)
>   return task
>
> # Spark Module to clasiify data
>
> bucket_key3 = "s3://aaaaa/bbbbb/" + dttm2 + "/"
> sparkcmd = """
>            cd /home/ubuntu/SC; /home/ubuntu/anaconda3/bin/python
>  NbRunner.py;
>            aws s3 cp /home/ubuntu/NC.txt {{params.get("bkey")}} --region
> us-west-1
> """
>
> t3 = BashOperator(
>     task_id='CNs',
>     bash_command=sparkcmd,
>     params={"bkey":bucket_key3},
>     retries=1,
>     dag=dag)
>
> t2 = detect_s3('t2')
>
> t3.set_upstream(t2)
>
>
> Thanks,
> MSR
>



-- 
Lance Norskog
lance.norskog@gmail.com
Redwood City, CA

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message