airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Galak (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-1814) Add op_args and op_kwargs in PythonOperator templated fields
Date Tue, 14 Nov 2017 22:25:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-1814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16252547#comment-16252547
] 

Galak commented on AIRFLOW-1814:
--------------------------------

I discovered a blocker issue for this change while using the work-around class:
{code}
class MyPythonOperator(PythonOperator):
    template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs')
{code}

all non string arguments passed to this operator will make the task fail with an {{AirflowException}}
raised by {{BaseOperator.render_template_from_field}}...

example:
{code}
value_consumer_task = MyPythonOperator(
        task_id='value_consumer_task',
        provide_context=True,
        python_callable=consume_value,
        op_args=[
            "{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}",
            3  # // << this argument makes the task fail with message: airflow.exceptions.AirflowException:
Type '<class 'int'>' used for parameter 'op_args' is not supported for templating
        ],
    dag=dag
)
{code}

Would it be possible for {{BaseOperator.render_template_from_field}} to log a warning and
simply return the value itself when it is not a string (nor a collection, nor a dictionary)?

At this point, I should probably work on the change and send a pull request, isn't it? But
I don't have a lot of free time, so I would like to be sure I'm not going in a wrong direction...



> Add op_args and op_kwargs in PythonOperator templated fields
> ------------------------------------------------------------
>
>                 Key: AIRFLOW-1814
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1814
>             Project: Apache Airflow
>          Issue Type: Wish
>          Components: operators
>    Affects Versions: Airflow 1.8, 1.8.0
>            Reporter: Galak
>            Priority: Minor
>
> *I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters could be templated.*
> I have 2 different use cases where this change could help a lot:
> +1/ Provide some job execution information as a python callable argument:+
> let's explain it through a simple example:
> {code}
> simple_task = PythonOperator(
>     task_id='simple_task',
>     provide_context=True,
>     python_callable=extract_data,
>     op_args=[
> 	"my_db_connection_id"
> 	"select * from my_table"
> 	"/data/{{ dag.dag_id }}/{{ ts }}/my_export.csv"
>     ],
>     dag=dag
> )
> {code}
> "extract_data" python function seems to be simple here, but it could be anything re-usable
in multiple dags...
> +2/ Provide some XCom value as a python callable argument:+
> Let's say I a have a task which is retrieving or calculating a value, and then storing
it in an XCom for further use by other tasks:
> {code}
> value_producer_task = PythonOperator(
>     task_id='value_producer_task',
>     provide_context=True,
>     python_callable=produce_value,
>     op_args=[
> 	"my_db_connection_id",
> 	"some_other_static_parameter",
> 	"my_xcom_key"
>     ],
>     dag=dag
> )
> {code}
> Then I can just configure a PythonCallable task to use the produced value:
> {code}
> value_consumer_task = PythonOperator(
>     task_id='value_consumer_task',
>     provide_context=True,
>     python_callable=consume_value,
>     op_args=[
> 	"{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}"
>     ],
>     dag=dag
> )
> {code}
> I quickly tried the following class:
> {code}
> from airflow.operators.python_operator import PythonOperator
> class MyPythonOperator(PythonOperator):
>     template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs')
> {code}
> and it worked like a charm.
> So could these 2 arguments be added to templated_fields? Or did I miss some major drawback
to this change?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message