airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kengo Seki (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (AIRFLOW-1674) Fix TriggerDagRunOperator to pass execution_date to the target
Date Tue, 10 Oct 2017 09:12:00 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-1674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Kengo Seki reassigned AIRFLOW-1674:
-----------------------------------

    Assignee: Kengo Seki

> Fix TriggerDagRunOperator to pass execution_date to the target
> --------------------------------------------------------------
>
>                 Key: AIRFLOW-1674
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1674
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: operators
>            Reporter: Kengo Seki
>            Assignee: Kengo Seki
>
> I ran example_trigger_target_dag via example_trigger_controller_dag with SQLite as backend.
> example_trigger_target_dag has two tasks i.e. run_this and bash_task, but the former
failed with the following error:
> {code}
> [2017-10-03 10:31:13,555] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 10:31:13,554]
{models.py:1563} ERROR - 'NoneType' object has no attribute 'conf'
> [2017-10-03 10:31:13,555] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent
call last):
> [2017-10-03 10:31:13,555] {base_task_runner.py:98} INFO - Subtask:   File "/home/sekikn/dev/incubator-airflow/airflow/models.py",
line 1461, in _run_raw_task
> [2017-10-03 10:31:13,555] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
> [2017-10-03 10:31:13,555] {base_task_runner.py:98} INFO - Subtask:   File "/home/sekikn/dev/incubator-airflow/airflow/operators/python_operator.py",
line 89, in execute
> [2017-10-03 10:31:13,555] {base_task_runner.py:98} INFO - Subtask:     return_value =
self.execute_callable()
> [2017-10-03 10:31:13,556] {base_task_runner.py:98} INFO - Subtask:   File "/home/sekikn/dev/incubator-airflow/airflow/operators/python_operator.py",
line 94, in execute_callable
> [2017-10-03 10:31:13,556] {base_task_runner.py:98} INFO - Subtask:     return self.python_callable(*self.op_args,
**self.op_kwargs)
> [2017-10-03 10:31:13,556] {base_task_runner.py:98} INFO - Subtask:   File "/home/sekikn/dev/incubator-airflow/airflow/example_dags/example_trigger_target_dag.py",
line 51, in run_this_func
> [2017-10-03 10:31:13,556] {base_task_runner.py:98} INFO - Subtask:     print("Remotely
received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
> [2017-10-03 10:31:13,557] {base_task_runner.py:98} INFO - Subtask: AttributeError: 'NoneType'
object has no attribute 'conf'
> [2017-10-03 10:31:13,557] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 10:31:13,555]
{models.py:1592} INFO - Marking task as FAILED.
> [2017-10-03 10:31:13,567] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 10:31:13,567]
{models.py:1612} ERROR - 'NoneType' object has no attribute 'conf'
> [2017-10-03 10:31:13,567] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent
call last):
> [2017-10-03 10:31:13,568] {base_task_runner.py:98} INFO - Subtask:   File "/home/sekikn/.virtualenvs/a/bin/airflow",
line 6, in <module>
> [2017-10-03 10:31:13,568] {base_task_runner.py:98} INFO - Subtask:     exec(compile(open(__file__).read(),
__file__, 'exec'))
> [2017-10-03 10:31:13,568] {base_task_runner.py:98} INFO - Subtask:   File "/home/sekikn/dev/incubator-airflow/airflow/bin/airflow",
line 27, in <module>
> [2017-10-03 10:31:13,568] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
> [2017-10-03 10:31:13,568] {base_task_runner.py:98} INFO - Subtask:   File "/home/sekikn/dev/incubator-airflow/airflow/bin/cli.py",
line 394, in run
> [2017-10-03 10:31:13,569] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
> [2017-10-03 10:31:13,569] {base_task_runner.py:98} INFO - Subtask:   File "/home/sekikn/dev/incubator-airflow/airflow/utils/db.py",
line 50, in wrapper
> [2017-10-03 10:31:13,569] {base_task_runner.py:98} INFO - Subtask:     result = func(*args,
**kwargs)
> [2017-10-03 10:31:13,569] {base_task_runner.py:98} INFO - Subtask:   File "/home/sekikn/dev/incubator-airflow/airflow/models.py",
line 1461, in _run_raw_task
> [2017-10-03 10:31:13,569] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
> [2017-10-03 10:31:13,569] {base_task_runner.py:98} INFO - Subtask:   File "/home/sekikn/dev/incubator-airflow/airflow/operators/python_operator.py",
line 89, in execute
> [2017-10-03 10:31:13,570] {base_task_runner.py:98} INFO - Subtask:     return_value =
self.execute_callable()
> [2017-10-03 10:31:13,570] {base_task_runner.py:98} INFO - Subtask:   File "/home/sekikn/dev/incubator-airflow/airflow/operators/python_operator.py",
line 94, in execute_callable
> [2017-10-03 10:31:13,570] {base_task_runner.py:98} INFO - Subtask:     return self.python_callable(*self.op_args,
**self.op_kwargs)
> [2017-10-03 10:31:13,570] {base_task_runner.py:98} INFO - Subtask:   File "/home/sekikn/dev/incubator-airflow/airflow/example_dags/example_trigger_target_dag.py",
line 51, in run_this_func
> [2017-10-03 10:31:13,570] {base_task_runner.py:98} INFO - Subtask:     print("Remotely
received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
> [2017-10-03 10:31:13,570] {base_task_runner.py:98} INFO - Subtask: AttributeError: 'NoneType'
object has no attribute 'conf'
> {code}
> The latter seemed to succeed apparently, but the expected message was not passed in reality.
> {code}
> [2017-10-03 10:31:07,643] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 10:31:07,643]
{bash_operator.py:95} INFO - Here is the message:
> {code}
> It seems to cause this problem that execution_dates are different between example_trigger_target_dag
and its tasks.
> {code}
> sqlite> select dag_id, execution_date from dag_run where dag_id='example_trigger_target_dag';
> example_trigger_target_dag|2017-10-03 14:31:00
> sqlite> select task_id, execution_date from task_instance where dag_id='example_trigger_target_dag';
> bash_task|2017-10-03 14:31:00.000000
> run_this|2017-10-03 14:31:00.000000
> {code}
> Passing execution_date to DAG#create_dagrun in TriggerDagRunOperator#execute will resolve
this problem, as Justin shows in the following mail:
> https://lists.apache.org/thread.html/e369f7f7c3dbe0b673a785f939d5259b45da5cdb923cb3415aa7e410@%3Cdev.airflow.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message