Return-Path: X-Original-To: apmail-airflow-commits-archive@minotaur.apache.org Delivered-To: apmail-airflow-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 806D5189B7 for ; Thu, 28 Apr 2016 21:14:19 +0000 (UTC) Received: (qmail 27271 invoked by uid 500); 28 Apr 2016 21:14:19 -0000 Delivered-To: apmail-airflow-commits-archive@airflow.apache.org Received: (qmail 27248 invoked by uid 500); 28 Apr 2016 21:14:19 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 27239 invoked by uid 99); 28 Apr 2016 21:14:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Apr 2016 21:14:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id EEA31C2404 for ; Thu, 28 Apr 2016 21:14:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.216 X-Spam-Level: X-Spam-Status: No, score=-4.216 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.996] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id hqVYuzr3ubsc for ; Thu, 28 Apr 2016 21:14:17 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id E01565F480 for ; Thu, 28 Apr 2016 21:14:15 +0000 (UTC) Received: (qmail 26438 invoked by uid 99); 28 Apr 2016 21:14:13 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Apr 2016 21:14:13 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id E34052C1F69 for ; Thu, 28 Apr 2016 21:14:12 +0000 (UTC) Date: Thu, 28 Apr 2016 21:14:12 +0000 (UTC) From: "Chris Riccomini (JIRA)" To: commits@airflow.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (AIRFLOW-19) How can I have an Operator B iterate over a list returned from upstream by Operator A? MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AIRFLOW-19?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263013#comment-15263013 ] Chris Riccomini commented on AIRFLOW-19: ---------------------------------------- Code from gist is: {code} t1 = DistCpSensor( task_id='sensor', lookback=2, distcp_dao=distcp_dao, source_conn=source_conn, parent_paths=['/tmp/testAirflowDistCp'], timeout=12*60*60, dag=dag) def run_distcp_on_each(*args, **kwargs): ti = kwargs['ti'] pprint(kwargs) to_process = ti.xcom_pull(task_ids=None, key='to_process') pprint(to_process) for work in to_process: t3 = DistCpOperator( task_id='distcp_command', source_conn=source_conn, work=work, dag=dag) t4 = BashOperator( task_id='run_distcp', bash_command="{{ ti.xcom_pull(task_ids='distcp_command') }}", xcom_push=True, env=os.environ.copy(), dag=dag) t5 = BashOperator( task_id='get_application_id', bash_command="echo {{ ti.xcom_pull(task_ids='run_distcp') }} | awk '{print $NF}' | sed 's/job/application/g'", xcom_push=True, env=os.environ.copy(), dag=dag) t6 = DistCpMonitor( task_id='monitor', application_id="{{ ti.xcom_pull(task_ids='get_application_id') }}", resource_manager_conn=resource_manager_conn, dag=dag) t7 = DistCpSensorCompletionOperator( task_id='mysql_update', distcp_dao=distcp_dao, dag=dag) t3.set_upstream(t2) t4.set_upstream(t3) t5.set_upstream(t4) t6.set_upstream(t5) t7.set_upstream(t6) t2 = PythonOperator( task_id='run_distcp_on_each', provide_context=True, python_callable=run_distcp_on_each, dag=dag) t2.set_upstream(t1) {code} > How can I have an Operator B iterate over a list returned from upstream by Operator A? > -------------------------------------------------------------------------------------- > > Key: AIRFLOW-19 > URL: https://issues.apache.org/jira/browse/AIRFLOW-19 > Project: Apache Airflow > Issue Type: Bug > Reporter: Praveenkumar Venkatesan > Priority: Minor > Labels: support > > Here is what I am trying to do exactly: https://gist.github.com/praveev/7b93b50746f8e965f7139ecba028490a > the python operator log just returns the following > [2016-04-28 11:56:22,296] {models.py:1041} INFO - Executing on 2016-04-28 11:56:12 > [2016-04-28 11:56:22,350] {python_operator.py:66} INFO - Done. Returned value was: None > it didn't even print my kwargs and to_process data > To simplify this. Lets say t1 returns 3 elements. I want to iterate over the list and run t2 -> t3 for each element. -- This message was sent by Atlassian JIRA (v6.3.4#6332)