airflow-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Li, Richard" <rl...@allstate.com>
Subject Re: How to access external arguments when passing them using REST API in airflow?
Date Mon, 06 Jul 2020 17:44:15 GMT
From “admin” menu, chose “variable”, input your “key”and “val” values. In python
scripts add
keyname = Variable.get("key"), use keyname in your dags


Richard Li
Big Data Engineer @ Product/Service Innovation Development
222 W. Merchandise Mart Plaza | Suite 850 | Chicago, Illinois 60654
6308632669


From: Ayush Goel <ayushgoel1594@gmail.com>
Reply-To: "users@airflow.apache.org" <users@airflow.apache.org>
Date: Monday, July 6, 2020 at 12:36 PM
To: "users@airflow.apache.org" <users@airflow.apache.org>
Subject: [External] How to access external arguments when passing them using REST API in airflow?

Hi,


 I tried triggering a airflow dag using the curl: http://localhost:8080/api/experimental/dags/api_test/dag_runs<https://urldefense.com/v3/__http:/localhost:8080/api/experimental/dags/api_test/dag_runs__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRvP6U6aSw$>
-H 'Cache-Control: no-cache'
-H 'Content-Type: application/json'
-d '{"conf":"{"input_path":"value", "output_path":"value" }"}'

I am able to trigger the workflow successfully but now I want to access this input_path and
output_path inside my PythonOperator code in the airflow dag.

Can someone help in this regard?

My Dag Code:

from datetime import timedelta

import logging



# The DAG object; we'll need this to instantiate a DAG

from airflow import DAG

# Operators; we need this to operate!

from airflow.operators.python_operator import PythonOperator

from airflow.utils.dates import days_ago



# These args will get passed on to each operator

# You can override them on a per-task basis during operator initialization

default_args = {

    'owner': 'someone',

    'depends_on_past': False,

    'start_date': days_ago(2),

    'email': ['xyz@xyz.com<mailto:xyz@xyz.com>'],

    'email_on_failure': False,

    'email_on_retry': False,

    'retries': 1,

    'retry_delay': timedelta(minutes=2)

}

dag = DAG(

    'api_test',

    default_args=default_args,

    description='testing rest api calls',

    schedule_interval=None,

)

def run_this_func(**kwargs):

    logging.info("Trying to print the logs")

    logging.info(kwargs['conf'])

    for key,value in kwargs['conf'].items():

        logging.info(key)

        logging.info(value)

    #logging.info<https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['dag_run'].conf['input_path'])

    #logging.info<https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['conf']['output_path'])



run_this = PythonOperator(

    task_id='run_this',

    python_callable=run_this_func,

    dag=dag,

    provide_context=True,

)



run_this

I tried printing the arguments by logging.info<https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['dag_run'].conf['input_path'])
logging.info<https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['conf']['output_path'])

But both of these lines are giving me errors. Is there any way of accessing the input_path
and output_path?

Thanks and Regards,
Ayush Goel
Mime
View raw message