airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andres Quiroz <andres.qui...@plumlending.com>
Subject Running a DAG that is only triggered externally (i.e. manually)
Date Wed, 14 Dec 2016 21:23:57 GMT
Hello,

I am new to Airflow and the mailing list, so please feel free to
re-direct me to the appropriate channel if this is not the right one
for my questions. I will state the questions first, in order or
importance, first, and then elaborate below. Thanks in advance.

1) Is setting "schedule_interval" to None in a DAG definition and
using the "airflow trigger_dag" command from the CLI with the
corresponding dag_id the correct way to trigger a DAG manually, so
that all of its tasks are executed? If correct, are these conditions
sufficient or is something missing?

2) How do I access the conf parameter passed to the trigger_dag
command in a BashOperator? (i.e. airflow trigger_dag --conf
'{"key":"value"}' dag_id)

Also, please note I am a relative Python newbie, so I would really
appreciate examples as opposed to just an explanation about the code.
Here are the details for the questions:

1) I've defined a DAG for a workflow that will ONLY be triggered
externally (i.e. manually). In other words, I do not intend to use the
scheduler to run the workflow, or set any sort of scheduling interval
for running or backfilling, etc. However, since the workflow is
complex and long-running, all of the other functionality that Airflow
provides, like initiating tasks with the correct dependencies and
starting concurrent tasks if possible, keeping track of the state of
each run in the database, etc., is desired.

My basic understanding is that this is achievable by setting the
schedule_interval in the DAG object to None, and then using the
"airflow trigger_dag" command from the CLI. However, the observed
behavior when doing this is that the state for the DAG does go to
running (I can see it as "running" in the DAG runs page in the UI),
but none of the tasks are actually started (the task instances table
in the UI remains empty, and the state of the tasks doesn't change in
the graph view). This happens both when running my own DAG as well as
the example_trigger_target_dag.py example.

The airflow configuration is set to use the LocalExecutor, and the
database has been initiated. The full CLI command is the following:

airflow trigger_dag -r run1 -c '{"batch":"test1"}' batch_test

Here are the main parts of the DAG definition .py file:

default_args = {
    'owner': 'data-dev',
    'depends_on_past': False,
    'start_date': datetime(2016, 12, 13),
    'email': ['andres.quiroz@plumlending.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('batch_test', default_args=default_args, schedule_interval=None)

bash_op = """
echo 'Generating {{ dag_run.conf["batch"] }}_out'
touch {{ dag_run.conf['batch'] }}_out.csv
"""

t1 = BashOperator(
    task_id='generate_output',
    bash_command=bash_op,
    params={},
    dag=dag)

2) As you can see from the CLI command and code snippet, I am trying
to pass a "batch" parameter for the DAG run, following the
instructions in the documentation and some forum posts I have seen.
However, this method is not working, since the dag_run object is empty
(None) when the bash operator tries to access conf.

I would also note that issues (1) and (2) are unrelated, because I
tried the trigger_dag command on a workflow that doesn't use the conf
parameter and on one that uses it, but handles the empty object case
(i.e. example_trigger_target_dag.py). Both of them failed to start any
tasks, but the latter did render the bash command, but with an empty
message, which meant the dag_run object was also empty.

Thanks again and apologies for the lengthy message; I have been
working on this for some time without success.

Regards,

Andrés

Mime
View raw message