airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joseph Napolitano <>
Subject Re: Suggested way of passing "input parameters" to a DAG run?
Date Wed, 03 Aug 2016 22:06:54 GMT
There are a lot of ways to define the input source.  Let's suppose you have
these inputs in a relational database, or a flat file on S3.

The first task in your DAG would be a matter of querying for those inputs,
or grabbing the file.  The trick is getting the inputs to later tasks.  The
XCOM feature is a way to share data between your tasks, so it's a matter of
pulling the XCOM from the task that originally queried the inputs.

Suppose you had an "input operator"

class InputOperator(BaseOperator)
    .... with an execute method ...
    def execute(self, context):
        ... whatever you return it retrievable in later tasks through XCOM
        return {"input_key": "input_value"}

Then in your DAG

input_operator_task = ... your InputOperator ....

downstream_task = SomeExistingOperator(


The XCOM pull is evaluated through the Jinja template.

Let me know if that helps, or if I completely misunderstood :)

Joe Nap

On Wed, Aug 3, 2016 at 5:29 PM, Andrew Phillips <> wrote:

> Hi all
> What is/are the suggested way(s) of passing "input parameters" to a DAG
> run (adding quotes since, as far as we can tell, that concept doesn't exist
> natively in Airflow, probably by design)?
> This would be information that is used by one or multiple operators in a
> DAG run and that should not change for all task instances in that DAG run,
> but may be different for another DAG run executing concurrently. An example
> would be a Git pull request number.
> What we tried first was to use a Variable for this, but it doesn't look
> like that will work because the value can change during the execution of
> the DAG run. At least, that seems to be the case in the way we're using it:
> input_params = Variable.get(<variable_for_dag>)
> dag = DAG(..., params=input_params)
> We had hoped that this would "fix" the values of the parameters when the
> DAG run was created, but that does not seem to be the case: if the variable
> is updated (in preparation for a new DAG run) while a DAG run is active,
> tasks that haven't executed yet see the new value. I.e. we end up seeing
> this:
> set Variable my_param to "foo"
> dag_run_1 starts, gets the variable and passes my_param to the Dag object
> dag_run_1.op_1 evaluates {{ params.my_param }} and gets "foo"
> set Variable my_param to "bar"
> dag_run_2 starts and passes var to the Dag object
> dag_run_1.op_2 evaluates {{ params.my_param }} and sees "bar" # want this
> to still be foo!
> Not sure at this point whether this is a bug or, if not, whether there's a
> different way to retrieve the value of a variable that allows us to "fix"
> it for the duration of the DAG run.
> Or, taking a step back, is there some other approach that we could use to
> store and retrieve input data to DAGs?
> Regards
> ap

*Joe Napolitano *| Sr. Data Engineer | 5 Crosby Street, New York, NY 10013

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message