airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Wenckus <alex.wenc...@mainstreethub.com>
Subject Airflow bug when losing connectivity to Celery
Date Tue, 11 Jul 2017 21:16:44 GMT
We have several remote workers running on a server independent from our
scheduler. We are using Celery executor with Redis, Redis runs alongside
our scheduler.

Periodically, we see messages in the the logs for the workers about losing
connectivity to Redis:

  File "/usr/local/lib/python2.7/site-packages/redis/connection.py", line
168, in readline
    self._read_from_socket()
  File "/usr/local/lib/python2.7/site-packages/redis/connection.py", line
143, in _read_from_socket
    (e.args,))
ConnectionError: Error while reading from socket: ('Connection closed by
server.',)

This causes Celery to retrigger any unacknowledged jobs:

[2017-07-11 12:42:42,420: WARNING/MainProcess] Restoring 11 unacknowledged
message(s)

For standard, non subtasks, this appears to be a nonissue and the DAG can
continue with the second triggered task failing. For subtasks on the other
hand, this appears to trigger an update to the task instance's entry in the
database which causes the subdag operator to see the task instance as
failed (even though the original task is still running). Any tasks
downstream of this task are then marked as upstream failed.

For many of our DAGs this is not an issue but for some which operate on
depends_on_past this obviously creates issues.

Here is an example of the logs from one of the task instances when the
second job starts:

[2017-07-11 12:42:44,733] {models.py:168} INFO - Filling up the DagBag
from /usr/lib/airflow/shared/dags/workflow/my_dags.py
[2017-07-11 12:42:45,671] {base_task_runner.py:112} INFO - Running:
['bash', '-c', u'airflow run
convert_tables_15m_v1.convert_collapse_opportunity_line_item
collapse_opportunity_line_item 2017-07-11T11:47:00 --job_id 132921
--raw -sd DAGS_FOLDER/convert_tables_dags.py']
[2017-07-11 12:42:46,232] {base_task_runner.py:95} INFO - Subtask:
[2017-07-11 12:42:46,232] {__init__.py:57} INFO - Using executor
CeleryExecutor
[2017-07-11 12:42:47,436] {base_task_runner.py:95} INFO - Subtask:
[2017-07-11 12:42:47,435] {models.py:168} INFO - Filling up the DagBag
from /usr/lib/airflow/shared/dags/workflow/convert_tables_dags.py
[2017-07-11 12:42:50,498] {base_task_runner.py:95} INFO - Subtask:
[2017-07-11 12:42:50,498] {models.py:1122} INFO - Dependencies not met
for <TaskInstance:
convert_tables_15m_v1.convert_collapse_opportunity_line_item.collapse_opportunity_line_item
2017-07-11 11:47:00 [running]>, dependency 'Task Instance Not Already
Running' FAILED: Task is already running, it started on 2017-07-11
12:31:43.569564.
[2017-07-11 12:42:50,499] {base_task_runner.py:95} INFO - Subtask:
[2017-07-11 12:42:50,499] {models.py:1122} INFO - Dependencies not met
for <TaskInstance:
convert_tables_15m_v1.convert_collapse_opportunity_line_item.collapse_opportunity_line_item
2017-07-11 11:47:00 [running]>, dependency 'Task Instance State'
FAILED: Task is in the 'running' state which is not a valid state for
execution. The task must be cleared in order to be run.
[2017-07-11 12:42:50,754] {jobs.py:2172} WARNING - Recorded pid 4769
is not a descendant of the current pid 6552
[2017-07-11 12:42:56,255] {jobs.py:2179} WARNING - State of this
instance has been externally set to failed. Taking the poison pill. So
long.


Is there something we can do in order to fix this issue or work around this
issue when jobs are retriggered from Airflow? I am not sure it is actually
connectivity issues to our Redis instance since not all of our worker
processes have `Connection closed by server.` log messages simultaneously.
Irregardless it seems like a situation Airflow should gracefully be able to
handle.

We are on Airflow 1.8.1

Thanks!
Alex

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message