airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rajesh Chamarthi <rajesh.chamar...@gmail.com>
Subject Airflow DAG deadlock, "SKIPPED" state not cascading
Date Wed, 14 Jun 2017 03:51:26 GMT
I currently have a dag which follows the following pattern

short_circuit_operator -> s3_sensor -> downstream_task_1 ->
Downstream_task_2

When short circuit evaluates to false, s3_sensor is skipped, other
downstream task states remains at None and DAG Run fails.

couple of questions :

1) Which part/component of the application (scheduler/operator/?) takes
care of cascading the skipped status to downstream jobs? Short Circuit
operator only seems to update the immediate downstream jobs

2) Using CeleryExecutor seems to cause this. Are there any other logs or
processes I can run to figure out the root of the problem?

More details below

* ShortCircuitOperator Log: (The first downstream task is set to skipped,
although log shows a warning)

```
[2017-06-12 09:00:24,552] {base_task_runner.py:95} INFO - Subtask:
[2017-06-12 09:00:24,552] {python_operator.py:177} INFO - Skipping task:
on_s3_xyz
[2017-06-12 09:00:24,553] {base_task_runner.py:95} INFO - Subtask:
[2017-06-12 09:00:24,553] {python_operator.py:188} WARNING - Task
<Task(S3KeySensor): on_s3_xyz> was not part of a dag run. This should not
happen.
```

* Scheduler log (marks the Dag Run as failed)

[2017-06-13 17:57:20,983] {models.py:4184} DagFileProcessor43 INFO -
Deadlock; marking run <DagRun test_inbound @ 2017-06-05 09:00:00:
scheduled__2017-06-05T09:00:00, externally triggered: False> failed

When I check the dag run and run through the code, it looks like trigger
rule evaluates to false because upstream is "skipped"

```
Previous Dagrun State True The task did not have depends_on_past set.
Not In Retry Period True The task instance was not marked for retrying.
Trigger Rule False Task's trigger rule 'all_success' requires all upstream
tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'failed': 0, 'successes': 0, 'skipped': 1, 'done': 1,
'upstream_failed': 0}, upstream_task_ids=['on_s3_xyz']
```

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message