airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kaxil Naik <kaxiln...@gmail.com>
Subject Re: [DISCUSS] Parametrized DAGs
Date Mon, 15 Jun 2020 23:38:26 GMT
Oh yes that sounds good, +1 to the idea as long as it can return a JSON
serializable object I am fine with it.

On Tue, Jun 16, 2020 at 12:29 AM Gerard Casas Saez
<gcasassaez@twitter.com.invalid> wrote:

> By XCom support before XComArg I meant as XCom parameters for operators.
> You needed to use {{contex[‘ti’].xcom_pull(…)}} instead of using XComArg
> objects like you can do as latest master.
>
> Gerard Casas Saez
> Twitter | Cortex | @casassaez
> On Jun 15, 2020, 5:02 PM -0600, Kaxil Naik <kaxilnaik@gmail.com>, wrote:
> > Isn't it already possible using params (
> >
> https://github.com/apache/airflow/blob/master/airflow/models/dag.py#L138-L141
> > )?
> >
> > Sample Usage:
> >
> https://gist.github.com/kaxil/335d90da8821a4e515046ff0f470fc97#file-airflow_params_usage_2-py
> >
> > Currently, we allowing passing params in the DAG and overriding the
> params
> > using dagrun_conf via CLI or UI:
> >
> > Code:
> >
> > -
> >
> https://github.com/apache/airflow/blob/3de68501b7a76dce24bfd8a8b4659eedcf7ac29c/airflow/models/taskinstance.py#L1335-L1336
> > -
> >
> https://github.com/apache/airflow/blob/3de68501b7a76dce24bfd8a8b4659eedcf7ac29c/airflow/models/taskinstance.py#L1454-L1456
> >
> >
> > Or am I missing something?
> >
> > Regards,
> > Kaxil
> >
> > On Mon, Jun 15, 2020 at 11:48 PM Gerard Casas Saez
> > <gcasassaez@twitter.com.invalid> wrote:
> >
> > > I do not think we should support RunTimeParams to modify the topology
> (at
> > > least at the beginning).
> > >
> > > Modify the topology involves quite a bit more of deeper changes. Even
> > > though it may be useful, I believe the value/time tradeoff, is high, so
> > > focusing on enabling parametrization on fixed topology is definitely an
> > > easier step to focus on and will probs bring enough value.
> > >
> > > Curious what are other people thoughts on this?
> > >
> > > Gerard Casas Saez
> > > Twitter | Cortex | @casassaez
> > > On Jun 12, 2020, 10:00 AM -0600, Dan Davydov
> <ddavydov@twitter.com.invalid>,
> > > wrote:
> > > > I think this is a great idea! One thing that I think we should
> figure out
> > > > before implementing is how to do so alongside DAG serialization, i.e.
> > > > letting these params modify DAG topology might make it hard to store
> > > > serialized representations for the Airflow services to consume and
> > > render,
> > > > though that may be more of a statement about the dagrun
> configuration and
> > > > orthogonal to the change proposed here.
> > > >
> > > > On Thu, Jun 11, 2020 at 7:58 PM Gerard Casas Saez
> > > > <gcasassaez@twitter.com.invalid> wrote:
> > > >
> > > > > As we wrap the work on AIP-31 (functional definition), I wanted to
> > > bring
> > > > > another idea here for discussion.
> > > > >
> > > > > The concept is to parametrize pipelines using a similar class than
> > > XComArg
> > > > > that we introduced recently. As of 1.10.10, we can use the UI to
> set
> > > the
> > > > > DagRun configuration on the trigger DAG view using a json blob.
> > > > >
> > > > > Accessing those is still hard (you need to pull DagRun from current
> > > > > context and then access the conf object). My proposal would be to
> add
> > > a new
> > > > > class that is resolved on execution similar to how we resolve
> XComArgs.
> > > > >
> > > > > class DAGParam(key:str, defaul:Any, type:type):
> > > > >
> > > > >
> > > > > def resolve(dag_run: DagRun):
> > > > >
> > > > > return dag_run.conf[self.key]
> > > > >
> > > > >
> > > > > # Raw usage:
> > > > >
> > > > >
> > > > > with DAG(...) as dag:
> > > > >
> > > > > param = DAGParam(key='number', default=3, type=int)
> > > > >
> > > > > SomeOperator(num=param)
> > > > >
> > > > >
> > > > > # From DAG object
> > > > >
> > > > >
> > > > > with DAG(...) as dag:
> > > > >
> > > > > SomeOperator(num=dag.param(key='number', default=3, type=int))
> > > > >
> > > > >
> > > > > # Decorator approach:
> > > > >
> > > > >
> > > > > @dag(...)
> > > > >
> > > > > def my_dag(number:int=3):
> > > > >
> > > > > SomeOperator(num=number)
> > > > >
> > > > >
> > > > > Gist:
> https://gist.github.com/casassg/aa29b4d5d7f07f16630e591e351e570a
> > > > >
> > > > > This would allow us to discover this params and surface them in the
> > > Trigger
> > > > > DAG UI
> > > > > <https://%20
> > >
> https://airflow.apache.org/blog/airflow-1.10.10/#allow-passing-dagrun-conf-when-triggering-dags-via-ui
> >
> > > as
> > > > > better form similar to what we currently have at Twitter (see
> > > > > DagConstructors here
> > > > > <
> > >
> https://blog.twitter.com/engineering/en_us/topics/insights/2018/ml-workflows.html
> >
> > > or
> > > > > image attached)
> > > > >
> > > > > Just wanted to drop this here to get people thoughts!
> > > > >
> > > > > The idea is heavily inspired by Kubeflow PipelinesParams + pipeline
> > > > > decorator.
> > > > >
> > > > > Gerard Casas Saez
> > > > > Twitter | Cortex | @casassaez <https://twitter.com/casassaez>
> > > > >
> > >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message