airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Miller, Robin" <Robin.Mil...@affiliate.oliverwyman.com>
Subject Re: Issue with latest versions of Celery & Kombu
Date Mon, 14 Nov 2016 11:56:11 GMT
Hi Nadeem,


We are using Celery with RabbitMQ.  The upgrade to Celery 4.0 last week did cause RabbitMQ
trouble. The web interface (for RabbitMQ) started giving us errors pointing to the use of
non-utf8 characters in message queues (or queue names, it wasn't clear), which RabbitMQ does
not support.


We've also solved this by moving back to version 3.1.15 of Celery. We didn't investigate further,
since this solved our problem, but I suspect that there are a fair few people who've been
tripped up by this, or will be.


Regards,

Robin Miller
OLIVER WYMAN
robin.miller@affiliate.oliverwyman.com<mailto:robin.miller@affiliate.oliverwyman.com>
www.oliverwyman.com<http://www.oliverwyman.com/>

________________________________
From: Nadeem Ahmed Nazeer <nazeer@neon-lab.com>
Sent: 12 November 2016 08:21:02
To: dev@airflow.incubator.apache.org
Cc: Nadeem Ahmed
Subject: Issue with latest versions of Celery & Kombu

Hi Airflowers,

We install airflow from our chef scripts and are currently using Airflow
1.7.1.3. We re-base airflow once in a while to reduce the amount of
backfills, where we burn down everything and bring it up. It worked fine
every time until today.

While doing the re-base now, we faced an issue with celery which was not
able to talk to the backend database using sqlalchemy. There were no
changes made to our setup scripts in chef.

2016-11-12 07:34:14,386 ERROR:airflow.jobs.SchedulerJob[MainThread] u'No
such transport: sqla'
Traceback (most recent call last):
 File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 755,
in _execute
   executor.heartbeat()
 File "/usr/local/lib/python2.7/dist-packages/airflow/executors/base_executor.py",
line 99, in heartbeat
   self.execute_async(key, command=command, queue=queue)
 File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py",
line 66, in execute_async
   args=[command], queue=queue)
 File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line
536, in apply_async
   **options
 File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line
714, in send_task
   with self.producer_or_acquire(producer) as P:
 File "/usr/local/lib/python2.7/dist-packages/celery/utils/objects.py",
line 85, in __enter__
   *self.fb_args, **self.fb_kwargs
 File "/usr/local/lib/python2.7/dist-packages/kombu/resource.py", line 83,
in acquire
   R = self.prepare(R)
 File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 62, in
prepare
   p = p()
 File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py",
line 203, in __call__
   return self.evaluate()
 File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py",
line 206, in evaluate
   return self._fun(*self._args, **self._kwargs)
 File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 42, in
create_producer
   conn = self._acquire_connection()
 File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 39, in
_acquire_connection
   return self.connections.acquire(block=True)
 File "/usr/local/lib/python2.7/dist-packages/kombu/resource.py", line 83,
in acquire
   R = self.prepare(R)
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
936, in prepare
   resource = resource()
 File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py",
line 203, in __call__
   return self.evaluate()
 File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py",
line 206, in evaluate
   return self._fun(*self._args, **self._kwargs)
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
908, in new
   return self.connection.clone()
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
587, in clone
   return self.__class__(**dict(self._info(resolve=False), **kwargs))
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
597, in _info
   D = self.transport.default_connection_params
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
832, in transport
   self._transport = self.create_transport()
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
576, in create_transport
   return self.get_transport_cls()(client=self)
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
582, in get_transport_cls
   transport_cls = get_transport_cls(transport_cls)
 File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py",
line 81, in get_transport_cls
   _transport_cache[transport] = resolve_transport(transport)
 File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py",
line 62, in resolve_transport
   raise KeyError('No such transport: {0}'.format(transport))
KeyError: u'No such transport: sqla'
2016-11-12 07:34:14,399 ERROR:airflow.jobs.SchedulerJob[MainThread]
Tachycardia!

The pip install for airflow[celery], installs celery version 4.0.0 and
kombu 4.0.0. Upon checking further, sqlalchemy has been removed from being
supported as a broker in latest version of kombu.
https://github.com/celery/kombu/commit/1cd4e07f9ebb2fdbde0f86054e963f
6bbd17e698#diff-7b8e685a0148804c7b353dc3f138d189

We had to manually apply a hotfix to downgrade the celery version to 3.1.15
which uses kombu version 3.0.37 that has sqla defined in TRANSPORT_ALIASES
dict of __init__.py (http://docs.celeryproject.
org/projects/kombu/en/latest/_modules/kombu/transport.html) whereas the
__init__.py for kombu 4.0.0 is missing it.

Is this a known issue now or has something changed w.r.t celery that we
have to include?

Please help. I would like your advice before I make any changes to the chef
scripts for airflow setup.

Thanks,
Nadeem

________________________________
This e-mail and any attachments may be confidential or legally privileged. If you received
this message in error or are not the intended recipient, you should destroy the e-mail message
and any attachments or copies, and you are prohibited from retaining, distributing, disclosing
or using any information contained herein. Please inform us of the erroneous delivery by return
e-mail. Thank you for your cooperation.
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message