airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kamil Bregula (Jira)" <j...@apache.org>
Subject [jira] [Comment Edited] (AIRFLOW-6527) Error sending Celery task:Timeout in send_task_to_executor
Date Fri, 10 Jan 2020 12:54:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17012815#comment-17012815
] 

Kamil Bregula edited comment on AIRFLOW-6527 at 1/10/20 12:53 PM:
------------------------------------------------------------------

Here is related Jira: https://issues.apache.org/jira/browse/AIRFLOW-6532


was (Author: kamil.bregula):
Here is related PR: https://issues.apache.org/jira/browse/AIRFLOW-6532

> Error sending Celery task:Timeout in send_task_to_executor
> ----------------------------------------------------------
>
>                 Key: AIRFLOW-6527
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6527
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.10.7
>            Reporter: Qian Yu
>            Priority: Major
>
> We use Airflow with CeleryExecutor and redis broker. Our airflow scheduler often encounters
this \{{AirflowTaskTimeout}} error. 
> - This happens in \{{send_task_to_executor()}}. 
> - It only happens occasionally. 
> - Retrying the failed task a few times always works.
> - This affects at least 1.10.6 and 1.10.7 and possibly other versions too. 
> Possible cause:
> Our airflow venv and dags_folder are on an NFS mount because we want to keep the various
pieces of Airflow services in sync. 
> The NFS mount can be slow sometimes. This causes the import to be slow and causes \{{send_task_to_executor()}}
to take more than 2 seconds.
> Other people with similar looking problems:
> The following issue is now closed. It's not clear to me whether or how the user resolved
this issue.
> https://github.com/bitnami/bitnami-docker-airflow-scheduler/issues/1
> Another user asked a question in the mailing list. It's not answered.
> https://www.mail-archive.com/dev@airflow.apache.org/msg01093.html
> Proposed workaround:
> - Make this `timeout(seconds=2)` configurable. E.g adding a [celery]send_task_timeout
to airflow.cfg. Since 2 seconds seems too short, we can configure it to something like 15
seconds to make it much less likely to happen.
> - Move airflow venv to the local disk. This makes it inconvenient to sync airflow installation
across multiple hosts though.
> {code}
> Jan 09 22:46:59 scheduler_host airflow[18882]: [2020-01-09 22:46:59,763] \{celery_executor.py:224}
ERROR - Error sending Celery task:Timeout, PID: 27724
> Jan 09 22:46:59 scheduler_host airflow[18882]: Celery Task ID: ('example_daily', 'example_sensor1',
datetime.datetime(2020, 1, 9, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
1)
> Jan 09 22:46:59 scheduler_host airflow[18882]: Traceback (most recent call last):
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/objects.py",
line 42, in __get__
> Jan 09 22:46:59 scheduler_host airflow[18882]: return obj.__dict__[self.__name__]
> Jan 09 22:46:59 scheduler_host airflow[18882]: KeyError: 'amqp'
> Jan 09 22:46:59 scheduler_host airflow[18882]: During handling of the above exception,
another exception occurred:
> Jan 09 22:46:59 scheduler_host airflow[18882]: Traceback (most recent call last):
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py",
line 118, in send_task_to_executor
> Jan 09 22:46:59 scheduler_host airflow[18882]: result = task.apply_async(args=[command],
queue=queue)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/task.py",
line 570, in apply_async
> Jan 09 22:46:59 scheduler_host airflow[18882]: **options
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/base.py",
line 712, in send_task
> Jan 09 22:46:59 scheduler_host airflow[18882]: amqp = self.amqp
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/objects.py",
line 44, in __get__
> Jan 09 22:46:59 scheduler_host airflow[18882]: value = obj.__dict__[self.__name__] =
self.__get(obj)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/base.py",
line 1202, in amqp
> Jan 09 22:46:59 scheduler_host airflow[18882]: return instantiate(self.amqp_cls, app=self)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/utils/imports.py",
line 55, in instantiate
> Jan 09 22:46:59 scheduler_host airflow[18882]: return symbol_by_name(name)(*args, **kwargs)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/imports.py",
line 57, in symbol_by_name
> Jan 09 22:46:59 scheduler_host airflow[18882]: module = imp(module_name, package=package,
**kwargs)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/usr/lib/python3.6/importlib/__init__.py",
line 126, in import_module
> Jan 09 22:46:59 scheduler_host airflow[18882]: return _bootstrap._gcd_import(name[level:],
package, level)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>",
line 994, in _gcd_import
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>",
line 971, in _find_and_load
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>",
line 955, in _find_and_load_unlocked
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>",
line 665, in _load_unlocked
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>",
line 678, in exec_module
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>",
line 219, in _call_with_frames_removed
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/amqp.py",
line 23, in <module>
> Jan 09 22:46:59 scheduler_host airflow[18882]: from . import routes as _routes
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>",
line 971, in _find_and_load
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>",
line 951, in _find_and_load_unlocked
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>",
line 894, in _find_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>",
line 1157, in find_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>",
line 1129, in _get_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>",
line 1271, in find_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>",
line 96, in _path_isfile
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>",
line 88, in _path_is_mode_type
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>",
line 82, in _path_stat
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py",
line 43, in handle_timeout
> Jan 09 22:46:59 scheduler_host airflow[18882]: raise AirflowTaskTimeout(self.error_message)
> Jan 09 22:46:59 scheduler_host airflow[18882]: airflow.exceptions.AirflowTaskTimeout:
Timeout, PID: 27724
> Jan 09 22:46:59 scheduler_host airflow[18882]: [2020-01-09 22:46:59,764] \{celery_executor.py:224}
ERROR - Error sending Celery task:Timeout, PID: 27725
> {code}
> This is the code that causes this. The timeout(seconds=2) is hardcoded:
> {code:python}
> def send_task_to_executor(task_tuple):
>     key, simple_ti, command, queue, task = task_tuple
>     try:
>         with timeout(seconds=2):
>             result = task.apply_async(args=[command], queue=queue)
>     except Exception as e:
>         exception_traceback = "Celery Task ID: {}\n{}".format(key,
>                                                               traceback.format_exc())
>         result = ExceptionWithTraceback(e, exception_traceback)
>     return key, command, result
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message