airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitriy Krasnikov <dkrasni...@hotmail.com>
Subject RE: Running a DAG that is only triggered externally (i.e. manually)
Date Fri, 16 Dec 2016 14:53:53 GMT
The pausing unpausing dags is set in airflow.cfg, something like `initial state`. I had to
change it in our installations for loaded dags to run.
Second is just UI refresh issue. If you cannot wait, click refresh button on the dag, it's
on the right side.

-----Original Message-----
From: Andres Quiroz [mailto:andres.quiroz@plumlending.com] 
Sent: Friday, December 16, 2016 9:41 AM
To: dev@airflow.incubator.apache.org
Subject: Re: Running a DAG that is only triggered externally (i.e. manually)

Hi Max, thanks a lot for your reply!

I was indeed running the scheduler, but what finally worked to get it to start the tasks after
calling "airflow trigger_dag" was "unpausing"
the DAG from the UI. A couple questions about this:

- It looks like DAGs are paused initially by default. Is there any way to set the property
from the DAG definition (.py) file, or does it have to be set from the UI or the CLI after
loading the DAG?
- The toggle switch for pausing and unpausing the DAG does not show up in the UI initially
for newly created DAGs, and I didn't notice exactly what I did before it started to show up
(i.e. what caused it to appear). Is there a specific condition under which the toggle switch
is enabled for a new DAG?

Once the DAG tasks did run, the conf argument passing worked as advertised :-)

Thanks again,

Andrés


On Thu, Dec 15, 2016 at 11:51 AM, Maxime Beauchemin <maximebeauchemin@gmail.com> wrote:
> Hi Andres, welcome to the community!
>
> A few related points:
> * I confirm, schedule_interval=None is the way to go
> * Are you running a scheduler? the scheduler is also in charge of 
> starting tasks, even for externally triggered DagRun
> * There's an operator that can emit dag runs, typically these triggers 
> would run on another DAG on a short `schedule_interval` (say 5 
> minutes) and at each run evaluates whether is should trigger a DagRun 
> or not, and pass conf parameters if needed:
> https://airflow.incubator.apache.org/code.html#airflow.operators.Trigg
> erDagRunOperator
> * you can also use the ORM to quick off new tasks programmatically 
> (from airflow.models import DagRun)
> * you can also use the UI to create DagRuns, though since the `conf` 
> is a Pickle, you can't really see what it's in there in the UI. Would 
> be a nice feature to show `conf` in the UI
>
> As to debugging your issue I'd just print `dag_run.conf` to see what's 
> in there. My wetware bash interpreter doesn't know whether the quoting 
> in your `-c` arg is ok or not...
>
> Max
>
> On Wed, Dec 14, 2016 at 1:23 PM, Andres Quiroz < 
> andres.quiroz@plumlending.com> wrote:
>
>> Hello,
>>
>> I am new to Airflow and the mailing list, so please feel free to 
>> re-direct me to the appropriate channel if this is not the right one 
>> for my questions. I will state the questions first, in order or 
>> importance, first, and then elaborate below. Thanks in advance.
>>
>> 1) Is setting "schedule_interval" to None in a DAG definition and 
>> using the "airflow trigger_dag" command from the CLI with the 
>> corresponding dag_id the correct way to trigger a DAG manually, so 
>> that all of its tasks are executed? If correct, are these conditions 
>> sufficient or is something missing?
>>
>> 2) How do I access the conf parameter passed to the trigger_dag 
>> command in a BashOperator? (i.e. airflow trigger_dag --conf 
>> '{"key":"value"}' dag_id)
>>
>> Also, please note I am a relative Python newbie, so I would really 
>> appreciate examples as opposed to just an explanation about the code.
>> Here are the details for the questions:
>>
>> 1) I've defined a DAG for a workflow that will ONLY be triggered 
>> externally (i.e. manually). In other words, I do not intend to use 
>> the scheduler to run the workflow, or set any sort of scheduling 
>> interval for running or backfilling, etc. However, since the workflow 
>> is complex and long-running, all of the other functionality that 
>> Airflow provides, like initiating tasks with the correct dependencies 
>> and starting concurrent tasks if possible, keeping track of the state 
>> of each run in the database, etc., is desired.
>>
>> My basic understanding is that this is achievable by setting the 
>> schedule_interval in the DAG object to None, and then using the 
>> "airflow trigger_dag" command from the CLI. However, the observed 
>> behavior when doing this is that the state for the DAG does go to 
>> running (I can see it as "running" in the DAG runs page in the UI), 
>> but none of the tasks are actually started (the task instances table 
>> in the UI remains empty, and the state of the tasks doesn't change in 
>> the graph view). This happens both when running my own DAG as well as 
>> the example_trigger_target_dag.py example.
>>
>> The airflow configuration is set to use the LocalExecutor, and the 
>> database has been initiated. The full CLI command is the following:
>>
>> airflow trigger_dag -r run1 -c '{"batch":"test1"}' batch_test
>>
>> Here are the main parts of the DAG definition .py file:
>>
>> default_args = {
>>     'owner': 'data-dev',
>>     'depends_on_past': False,
>>     'start_date': datetime(2016, 12, 13),
>>     'email': ['andres.quiroz@plumlending.com'],
>>     'email_on_failure': False,
>>     'email_on_retry': False,
>>     'retries': 0,
>>     'retry_delay': timedelta(minutes=5), }
>>
>> dag = DAG('batch_test', default_args=default_args, 
>> schedule_interval=None)
>>
>> bash_op = """
>> echo 'Generating {{ dag_run.conf["batch"] }}_out'
>> touch {{ dag_run.conf['batch'] }}_out.csv """
>>
>> t1 = BashOperator(
>>     task_id='generate_output',
>>     bash_command=bash_op,
>>     params={},
>>     dag=dag)
>>
>> 2) As you can see from the CLI command and code snippet, I am trying 
>> to pass a "batch" parameter for the DAG run, following the 
>> instructions in the documentation and some forum posts I have seen.
>> However, this method is not working, since the dag_run object is 
>> empty
>> (None) when the bash operator tries to access conf.
>>
>> I would also note that issues (1) and (2) are unrelated, because I 
>> tried the trigger_dag command on a workflow that doesn't use the conf 
>> parameter and on one that uses it, but handles the empty object case 
>> (i.e. example_trigger_target_dag.py). Both of them failed to start 
>> any tasks, but the latter did render the bash command, but with an 
>> empty message, which meant the dag_run object was also empty.
>>
>> Thanks again and apologies for the lengthy message; I have been 
>> working on this for some time without success.
>>
>> Regards,
>>
>> Andrés
>>
Mime
View raw message