airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Laurent Bonafons (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AIRFLOW-1086) Fail to execute task with upstream dependency in subdag
Date Tue, 25 Apr 2017 11:27:04 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-1086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Laurent Bonafons updated AIRFLOW-1086:
--------------------------------------
    Affects Version/s: 1.8.0

> Fail to execute task with upstream dependency in subdag
> -------------------------------------------------------
>
>                 Key: AIRFLOW-1086
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1086
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery, subdag
>    Affects Versions: Airflow 1.8, 1.8.0
>            Reporter: Laurent Bonafons
>         Attachments: test_bubdag_task_instances.png, test_subdag_graph.png, test_subdag.py
>
>
> Hello,
> We have been migrating from Airflow v1.7.1.3 to v1.8.0 and we can't run subdags anymore.
We use CeleryExecutor with RabbitMQ for backend.
> I tested on more and more simplified cases to finish up with the great example "test_subdag"
from Joe Schmid (cf attachment).
> And it still doesn't work. In a subdag only the first tasks, the ones without upstream
dependencies, run.
> When a task is successful in a subdag, downstream tasks are not executed at all even
if in the log of the subdag we can see that "Dependencies all met" for the task.
> This looks similar to AIRFLOW-955 ("job failed to execute tasks") reported by Jeff Liu
> but here we're not on level 2, it's just a subdag containing tasks.
> Here an example of subdag log in v1.7.1.3
> {noformat} [2017-04-06 12:11:33,648] {models.py:154} INFO - Filling up the DagBag from
/usr/local/airflow/dags/tricky_test_3.py
> [2017-04-06 12:11:35,052] {models.py:154} INFO - Filling up the DagBag from /usr/local/airflow/dags/tricky_test_3.py
> [2017-04-06 12:11:35,125] {models.py:1196} INFO - 
> --------------------------------------------------------------------------------
> Starting attempt 1 of 1
> --------------------------------------------------------------------------------
> [2017-04-06 12:11:35,136] {models.py:1219} INFO - Executing <Task(SubDagOperator):
SubDagOp> on 2017-04-03 00:00:00
> [2017-04-06 12:11:35,165] {base_executor.py:36} INFO - Adding to queue: airflow run Test_SubDAG.SubDagOp
SubDAG_Task1 2017-04-03T00:00:00 --local -sd DAGS_FOLDER/tricky_test_3.py 
> [2017-04-06 12:11:40,014] {sequential_executor.py:26} INFO - Executing command: airflow
run Test_SubDAG.SubDagOp SubDAG_Task1 2017-04-03T00:00:00 --local -sd DAGS_FOLDER/tricky_test_3.py

> [2017-04-06 12:11:46,176] {jobs.py:934} INFO - Task instance ('Test_SubDAG.SubDagOp',
'SubDAG_Task1', datetime.datetime(2017, 4, 3, 0, 0)) succeeded
> [2017-04-06 12:11:46,176] {jobs.py:997} INFO - [backfill progress] | waiting: 1 | succeeded:
1 | kicked_off: 1 | failed: 0 | skipped: 0 | deadlocked: 0
> [2017-04-06 12:11:46,185] {base_executor.py:36} INFO - Adding to queue: airflow run Test_SubDAG.SubDagOp
SubDAG_Task2 2017-04-03T00:00:00 --local -sd DAGS_FOLDER/tricky_test_3.py 
> [2017-04-06 12:11:46,195] {sequential_executor.py:26} INFO - Executing command: airflow
run Test_SubDAG.SubDagOp SubDAG_Task2 2017-04-03T00:00:00 --local -sd DAGS_FOLDER/tricky_test_3.py

> [2017-04-06 12:11:52,177] {jobs.py:934} INFO - Task instance ('Test_SubDAG.SubDagOp',
'SubDAG_Task2', datetime.datetime(2017, 4, 3, 0, 0)) succeeded
> [2017-04-06 12:11:52,177] {jobs.py:997} INFO - [backfill progress] | waiting: 0 | succeeded:
2 | kicked_off: 2 | failed: 0 | skipped: 0 | deadlocked: 0
> [2017-04-06 12:11:52,178] {jobs.py:1026} INFO - Backfill done. Exiting.
> {noformat}
> And here in v1.8.0
> {noformat}
> [2017-04-05 16:17:51,854] {models.py:167} INFO - Filling up the DagBag from /usr/local/airflow/dags/tricky_test_3.py
> [2017-04-05 16:17:51,996] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow
run Test_SubDAG SubDagOp 2017-04-04T00:00:00 --job_id 9987 --raw -sd DAGS_FOLDER/tricky_test_3.py']
> [2017-04-05 16:17:52,803] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:52,803]
{__init__.py:57} INFO - Using executor CeleryExecutor
> [2017-04-05 16:17:52,917] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:52,917]
{driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
> [2017-04-05 16:17:52,957] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:52,956]
{driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> [2017-04-05 16:17:53,262] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,262]
{models.py:167} INFO - Filling up the DagBag from /usr/local/airflow/dags/tricky_test_3.py
> [2017-04-05 16:17:53,401] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,400]
{models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp 2017-04-04
00:00:00 [queued]>
> [2017-04-05 16:17:53,409] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,409]
{models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp 2017-04-04
00:00:00 [queued]>
> [2017-04-05 16:17:53,409] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,409]
{models.py:1318} INFO - 
> [2017-04-05 16:17:53,409] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
> [2017-04-05 16:17:53,409] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1
of 1
> [2017-04-05 16:17:53,409] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
> [2017-04-05 16:17:53,410] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-05 16:17:53,443] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,442]
{models.py:1342} INFO - Executing <Task(SubDagOperator): SubDagOp> on 2017-04-04 00:00:00
> [2017-04-05 16:17:53,553] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,552]
{models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task1
2017-04-04 00:00:00 [scheduled]>
> [2017-04-05 16:17:53,559] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,558]
{base_executor.py:50} INFO - Adding to queue: airflow run Test_SubDAG.SubDagOp SubDAG_Task1
2017-04-04T00:00:00 --local -sd DAGS_FOLDER/tricky_test_3.py
> [2017-04-05 16:17:53,644] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,644]
{models.py:1120} INFO - Dependencies not met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task2
2017-04-04 00:00:00 [scheduled]>, dependency 'Trigger Rule' FAILED: Task's trigger rule
'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'successes': Decimal('0'), 'failed': Decimal('0'), 'upstream_failed':
Decimal('0'), 'skipped': Decimal('0'), 'done': 0L}, upstream_task_ids=['SubDAG_Task1']
> [2017-04-05 16:17:58,477] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:58,477]
{sequential_executor.py:40} INFO - Executing command: airflow run Test_SubDAG.SubDagOp SubDAG_Task1
2017-04-04T00:00:00 --local -sd DAGS_FOLDER/tricky_test_3.py
> [2017-04-05 16:17:59,284] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:59,284]
{__init__.py:57} INFO - Using executor CeleryExecutor
> [2017-04-05 16:17:59,399] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:59,398]
{driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
> [2017-04-05 16:17:59,438] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:59,438]
{driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> [2017-04-05 16:18:04,975] {base_task_runner.py:95} INFO - Subtask: Logging into: /usr/local/airflow/logs/Test_SubDAG.SubDagOp/SubDAG_Task1/2017-04-04T00:00:00
> [2017-04-05 16:18:04,997] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:04,997]
{models.py:4025} INFO - Updating state for <DagRun Test_SubDAG.SubDagOp @ 2017-04-04 00:00:00:
backfill_2017-04-04T00:00:00, externally triggered: False> considering 2 task(s)
> [2017-04-05 16:18:05,028] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:05,028]
{jobs.py:1982} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded:
1 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
> [2017-04-05 16:18:05,043] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:05,043]
{models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task2
2017-04-04 00:00:00 [None]>
> [2017-04-05 16:18:05,060] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:05,060]
{models.py:4025} INFO - Updating state for <DagRun Test_SubDAG.SubDagOp @ 2017-04-04 00:00:00:
backfill_2017-04-04T00:00:00, externally triggered: False> considering 2 task(s)
> [2017-04-05 16:18:05,078] {base_task_runner.py:95} INFO - Subtask: /usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/default_comparator.py:161:
SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results
in a contradiction, which nonetheless can be expensive to evaluate.  Consider alternative
strategies for improved performance.
> [2017-04-05 16:18:05,078] {base_task_runner.py:95} INFO - Subtask:   'strategies for
improved performance.' % expr)
> [2017-04-05 16:18:05,081] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:05,080]
{jobs.py:1982} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded:
1 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
> [2017-04-05 16:18:05,095] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:05,095]
{models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task2
2017-04-04 00:00:00 [None]>
> [2017-04-05 16:18:10,071] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:10,070]
{models.py:4025} INFO - Updating state for <DagRun Test_SubDAG.SubDagOp @ 2017-04-04 00:00:00:
backfill_2017-04-04T00:00:00, externally triggered: False> considering 2 task(s)
> [2017-04-05 16:18:10,092] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:10,091]
{jobs.py:1982} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded:
1 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
> [2017-04-05 16:18:10,106] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:10,105]
{models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task2
2017-04-04 00:00:00 [None]>
> [2017-04-05 16:18:15,080] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:15,080]
{models.py:4025} INFO - Updating state for <DagRun Test_SubDAG.SubDagOp @ 2017-04-04 00:00:00:
backfill_2017-04-04T00:00:00, externally triggered: False> considering 2 task(s)
> [2017-04-05 16:18:15,101] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:15,100]
{jobs.py:1982} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded:
1 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
> [2017-04-05 16:18:15,116] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:15,116]
{models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task2
2017-04-04 00:00:00 [None]>
> [2017-04-05 16:18:20,091] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:20,090]
{models.py:4025} INFO - Updating state for <DagRun Test_SubDAG.SubDagOp @ 2017-04-04 00:00:00:
backfill_2017-04-04T00:00:00, externally triggered: False> considering 2 task(s)
> [2017-04-05 16:18:20,112] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:20,111]
{jobs.py:1982} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded:
1 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
> [2017-04-05 16:18:20,127] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:20,126]
{models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task2
2017-04-04 00:00:00 [None]>
> {noformat}
> Also in attachment:
> - test_subdag_graph.png is what you can see in the graph view
> - test_bubdag_task_instances.png is what you can find by browsing in "Task Instances"



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message