airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joseph Napolitano <joseph.napolit...@blueapron.com.INVALID>
Subject Re: Suggested way of passing "input parameters" to a DAG run?
Date Wed, 03 Aug 2016 22:06:54 GMT
There are a lot of ways to define the input source.  Let's suppose you have
these inputs in a relational database, or a flat file on S3.

The first task in your DAG would be a matter of querying for those inputs,
or grabbing the file.  The trick is getting the inputs to later tasks.  The
XCOM feature is a way to share data between your tasks, so it's a matter of
pulling the XCOM from the task that originally queried the inputs.

Suppose you had an "input operator"

class InputOperator(BaseOperator)
    .... with an execute method ...
    def execute(self, context):
        ... whatever you return it retrievable in later tasks through XCOM
....
        return {"input_key": "input_value"}

Then in your DAG

input_operator_task = ... your InputOperator ....

downstream_task = SomeExistingOperator(
    task_id='downstream_task',

 keyword_arg_using_your_inputs="{{ti.xcom_pull(task_ids='input_operator_task')}}",
    dag=dag
)

The XCOM pull is evaluated through the Jinja template.

Let me know if that helps, or if I completely misunderstood :)

Joe Nap

On Wed, Aug 3, 2016 at 5:29 PM, Andrew Phillips <andrewp@apache.org> wrote:

> Hi all
>
> What is/are the suggested way(s) of passing "input parameters" to a DAG
> run (adding quotes since, as far as we can tell, that concept doesn't exist
> natively in Airflow, probably by design)?
>
> This would be information that is used by one or multiple operators in a
> DAG run and that should not change for all task instances in that DAG run,
> but may be different for another DAG run executing concurrently. An example
> would be a Git pull request number.
>
> What we tried first was to use a Variable for this, but it doesn't look
> like that will work because the value can change during the execution of
> the DAG run. At least, that seems to be the case in the way we're using it:
>
> input_params = Variable.get(<variable_for_dag>)
> dag = DAG(..., params=input_params)
>
> We had hoped that this would "fix" the values of the parameters when the
> DAG run was created, but that does not seem to be the case: if the variable
> is updated (in preparation for a new DAG run) while a DAG run is active,
> tasks that haven't executed yet see the new value. I.e. we end up seeing
> this:
>
> set Variable my_param to "foo"
> dag_run_1 starts, gets the variable and passes my_param to the Dag object
> dag_run_1.op_1 evaluates {{ params.my_param }} and gets "foo"
> set Variable my_param to "bar"
> dag_run_2 starts and passes var to the Dag object
> dag_run_1.op_2 evaluates {{ params.my_param }} and sees "bar" # want this
> to still be foo!
>
> Not sure at this point whether this is a bug or, if not, whether there's a
> different way to retrieve the value of a variable that allows us to "fix"
> it for the duration of the DAG run.
>
> Or, taking a step back, is there some other approach that we could use to
> store and retrieve input data to DAGs?
>
> Regards
>
> ap
>
>
>


-- 
*Joe Napolitano *| Sr. Data Engineer
www.blueapron.com | 5 Crosby Street, New York, NY 10013

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message