Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4AA4E200B4F for ; Tue, 12 Jul 2016 02:44:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 492F4160A7D; Tue, 12 Jul 2016 00:44:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8F98B160A78 for ; Tue, 12 Jul 2016 02:44:00 +0200 (CEST) Received: (qmail 17677 invoked by uid 500); 12 Jul 2016 00:43:54 -0000 Mailing-List: contact dev-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list dev@airflow.incubator.apache.org Received: (qmail 17665 invoked by uid 99); 12 Jul 2016 00:43:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Jul 2016 00:43:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 14CFEC0B66 for ; Tue, 12 Jul 2016 00:43:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id X2SMFsm2wKsV for ; Tue, 12 Jul 2016 00:43:52 +0000 (UTC) Received: from mail-yw0-f174.google.com (mail-yw0-f174.google.com [209.85.161.174]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id 015ED5F36D for ; Tue, 12 Jul 2016 00:43:51 +0000 (UTC) Received: by mail-yw0-f174.google.com with SMTP id i12so700583ywa.1 for ; Mon, 11 Jul 2016 17:43:51 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:from:date:message-id:subject:to; bh=QglnjSomj8SHxydhroHzpG22pLVSSd3bmX0lP57N7hs=; b=ja6BpSo2UdsDVTZzM/AuNbr/MUG16GZ6vZfK1eqH3Cxibg22icbAp0yKPDKJ6dfjMO 9q7qIlC46BMIop6o5r19qob16cAi5VYUYyL3dK0TfcJVQROgXqiODNA5+06/5tyrWAeq ZPu0ODCiw6dj7X2smoZbm0js+0dZojch+HUJYZNAEG+5yH9vJwc//OzFsGo6hG4Qwmm9 +tmIn+sjiiJmCB1Fd6/DA3qn1Ox2fHoER/kIhsmRBWmf+QzDC5wkefy2tRhRJI+AYFUS gbW9bxCxcBBdZ59gkzrcCLQTWAcdZhHUvDeGcqpJacv4qcq2EpL7z2zvQuXi7blY0Dll JlUw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=QglnjSomj8SHxydhroHzpG22pLVSSd3bmX0lP57N7hs=; b=S4UjblPYtPQ/a7Z2HoJWUUBcMHU9UpTbtkO7z1zajqMR4dW83OuTF871vnVhsH7toV hjVU/DVZ+CG98bFQTdzSUWrtH5nkG+IVC+Xe6tH/VZxStkolyv45dLDmYEh/39pTlXE/ Zb0LJTfb71iNCwovWoKxwH8MNmn2TZjsyZ+e+FIHlBgc4zj46TRCzpGcM8sXj3fBzrba qnDGH0VR9W45PDghGg5gct1pyq+XFcNARRepZRNUvrgN7CObE5eKd/1r/7bocfktkdPB LelnAUMTVYOsRFDyu70Ktqw8/bZRLp4UQNGYFB0SUm3XVa5bTVTMh2a71Z4XuMFogJo8 w5DA== X-Gm-Message-State: ALyK8tIfnn8R+hUIrwR5HD1pp6A1AS96Wy+gIkup0vVt9iOU7u0+3IFpp2nln626XD6UtfliVtl2+qol6tGSyg== X-Received: by 10.129.106.197 with SMTP id f188mr15884380ywc.245.1468284230729; Mon, 11 Jul 2016 17:43:50 -0700 (PDT) MIME-Version: 1.0 Received: by 10.37.199.9 with HTTP; Mon, 11 Jul 2016 17:43:50 -0700 (PDT) From: MSR M Date: Mon, 11 Jul 2016 17:43:50 -0700 Message-ID: Subject: Creating and accessing different variable values for each instance of DAG run To: dev@airflow.incubator.apache.org Content-Type: multipart/alternative; boundary=001a11494a8e0082b305376590ab archived-at: Tue, 12 Jul 2016 00:44:01 -0000 --001a11494a8e0082b305376590ab Content-Type: text/plain; charset=UTF-8 Hi, I need some advice in solving a problem with local variables in DAG. I have a DAG < schedule intervel 30 mins >. It has 3 tasks. t1 runs a python program on remote EC2. t2 waits for S3 file availability at particular location. This S3 file created by t1. Once the S3 file is available, t3 runs and process the file on S3. I have date-time as part of my S3 file location. dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M') bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess" t1 runs more than 1 hour so second instance of DAG is already started and it changes the variable dttm2 value so job1 task # t2 is trying to locate the file at different location. To overcome this I am planning to use parameter {{execution_date}} instead of getting dttm2 value as shown above. In situations like these, is there any better approach to keep same value in a variable through out the particular run of DAG? Or use XCOM feature to push and pull the values across the tasks with different keys for each run? part of my dag is given below: # dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M') NL = """ cd /home/ubuntu/Scripts/ ; python2 a11.py {{params.get("dttm2")}} ; """ t1 = BashOperator( task_id='E_Ns_A', bash_command=NL, params={'dttm2':dttm2}, retries=3, dag=dag) bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess" def detect_s3(name, dag=dag, upstream=t1): task = S3KeySensor( task_id = name, bucket_key=bucket_key2, s3_conn_id='s3conn', dag=dag, wildcard_match=True) task.set_upstream(upstream) return task # Spark Module to clasiify data bucket_key3 = "s3://aaaaa/bbbbb/" + dttm2 + "/" sparkcmd = """ cd /home/ubuntu/SC; /home/ubuntu/anaconda3/bin/python NbRunner.py; aws s3 cp /home/ubuntu/NC.txt {{params.get("bkey")}} --region us-west-1 """ t3 = BashOperator( task_id='CNs', bash_command=sparkcmd, params={"bkey":bucket_key3}, retries=1, dag=dag) t2 = detect_s3('t2') t3.set_upstream(t2) Thanks, MSR --001a11494a8e0082b305376590ab--