airflow-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitriy Nikolaev <etsu.nikol...@gmail.com>
Subject Re: Re: How to access external arguments when passing them using REST API in airflow?
Date Mon, 06 Jul 2020 19:45:58 GMT
Hi, Ayush!Just use kwargs['dag_run'].conf['message']
See https://www.astronomer.io/guides/trigger-dag-operator/

пн, 6 июл. 2020 г. в 21:41, Ayush Goel <ayushgoel1594@gmail.com>:

> Hi,
>
> Thanks Dileep for clarifying my question. That is the exact thing, I
> wanted to ask :)
>
> Thanks and Regards,
> Ayush Goel
>
>
> On Tue, Jul 7, 2020 at 12:08 AM Dileep Kancharla <
> dileep.kancharla18@gmail.com> wrote:
>
>> Hi,
>> Apologies if I am wrong!
>>
>> I think the question was how to pass the input arguments through rest api
>> call, and how to access those variables inside the dag.
>>
>> Thanks
>> Dileep
>>
>> On Mon, Jul 6, 2020 at 11:51 PM Li, Richard <rliaa@allstate.com> wrote:
>>
>>> Define this way in “val” as a variable reference and pass value from
>>> command line, for example,
>>>
>>> bash_command= “/path/scriptname” + keyname,
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Richard Li
>>>
>>> Big Data Engineer @ Product/Service Innovation Development
>>>
>>> 222 W. Merchandise Mart Plaza | Suite 850 | Chicago, Illinois 60654
>>>
>>> 6308632669
>>>
>>>
>>>
>>>
>>>
>>> *From: *Ayush Goel <ayushgoel1594@gmail.com>
>>> *Date: *Monday, July 6, 2020 at 12:59 PM
>>> *To: *Richard Li <rliaa@allstate.com>
>>> *Cc: *"users@airflow.apache.org" <users@airflow.apache.org>
>>> *Subject: *[External] Re: How to access external arguments when passing
>>> them using REST API in airflow?
>>>
>>>
>>>
>>>  Hi Richard,
>>>
>>>
>>>
>>> The arguments which we are passing are dynamic and the user does not
>>> have access to UI, so he cannot change the variable values each time.
>>>
>>>
>>>
>>> We want to give the rest api interface to the user through which he can
>>> trigger the dag by himself while passing some user specific arguments.
>>>
>>>
>>>
>>> Thanks and Regards,
>>>
>>> Ayush Goel
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jul 6, 2020 at 11:14 PM Li, Richard <rliaa@allstate.com> wrote:
>>>
>>> From “admin” menu, chose “variable”, input your “key”and “val”
values.
>>> In python scripts add
>>>
>>> keyname = Variable.get("key"), use keyname in your dags
>>>
>>>
>>>
>>>
>>>
>>> Richard Li
>>>
>>> Big Data Engineer @ Product/Service Innovation Development
>>>
>>> 222 W. Merchandise Mart Plaza | Suite 850 | Chicago, Illinois 60654
>>>
>>> 6308632669
>>>
>>>
>>>
>>>
>>>
>>> *From: *Ayush Goel <ayushgoel1594@gmail.com>
>>> *Reply-To: *"users@airflow.apache.org" <users@airflow.apache.org>
>>> *Date: *Monday, July 6, 2020 at 12:36 PM
>>> *To: *"users@airflow.apache.org" <users@airflow.apache.org>
>>> *Subject: *[External] How to access external arguments when passing
>>> them using REST API in airflow?
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>>  I tried triggering a airflow dag using the curl:
>>> http://localhost:8080/api/experimental/dags/api_test/dag_runs
>>> <https://urldefense.com/v3/__http:/localhost:8080/api/experimental/dags/api_test/dag_runs__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRvP6U6aSw$>
>>> -H 'Cache-Control: no-cache'
>>> -H 'Content-Type: application/json'
>>> -d '{"conf":"{"input_path":"value", "output_path":"value" }"}'
>>>
>>> I am able to trigger the workflow successfully but now I want to access
>>> this input_path and output_path inside my PythonOperator code in the
>>> airflow dag.
>>>
>>> Can someone help in this regard?
>>>
>>> My Dag Code:
>>>
>>> from datetime import timedelta
>>>
>>> import logging
>>>
>>>
>>>
>>> # The DAG object; we'll need this to instantiate a DAG
>>>
>>> from airflow import DAG
>>>
>>> # Operators; we need this to operate!
>>>
>>> from airflow.operators.python_operator import PythonOperator
>>>
>>> from airflow.utils.dates import days_ago
>>>
>>>
>>>
>>> # These args will get passed on to each operator
>>>
>>> # You can override them on a per-task basis during operator initialization
>>>
>>> default_args = {
>>>
>>>     'owner': 'someone',
>>>
>>>     'depends_on_past': False,
>>>
>>>     'start_date': days_ago(2),
>>>
>>>     'email': ['xyz@xyz.com'],
>>>
>>>     'email_on_failure': False,
>>>
>>>     'email_on_retry': False,
>>>
>>>     'retries': 1,
>>>
>>>     'retry_delay': timedelta(minutes=2)
>>>
>>> }
>>>
>>> dag = DAG(
>>>
>>>     'api_test',
>>>
>>>     default_args=default_args,
>>>
>>>     description='testing rest api calls',
>>>
>>>     schedule_interval=None,
>>>
>>> )
>>>
>>> def run_this_func(**kwargs):
>>>
>>>     logging.info("Trying to print the logs")
>>>
>>>     logging.info(kwargs['conf'])
>>>
>>>     for key,value in kwargs['conf'].items():
>>>
>>>         logging.info(key)
>>>
>>>         logging.info(value)
>>>
>>>     #logging.info <https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['dag_run'].conf['input_path'])
>>>
>>>     #logging.info <https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['conf']['output_path'])
>>>
>>>
>>>
>>> run_this = PythonOperator(
>>>
>>>     task_id='run_this',
>>>
>>>     python_callable=run_this_func,
>>>
>>>     dag=dag,
>>>
>>>     provide_context=True,
>>>
>>> )
>>>
>>>
>>>
>>> run_this
>>>
>>> I tried printing the arguments by logging.info
>>> <https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['dag_run'].conf['input_path'])
>>> logging.info
>>> <https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>
>>> (kwargs['conf']['output_path'])
>>>
>>> But both of these lines are giving me errors. Is there any way of
>>> accessing the input_path and output_path?
>>>
>>>
>>> Thanks and Regards,
>>>
>>> Ayush Goel
>>>
>>>

-- 
С уважением, Николаев Дмитрий

Mime
View raw message