airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nadeem Ahmed Nazeer <naz...@neon-lab.com>
Subject Issue with latest versions of Celery & Kombu
Date Sat, 12 Nov 2016 08:21:02 GMT
Hi Airflowers,

We install airflow from our chef scripts and are currently using Airflow
1.7.1.3. We re-base airflow once in a while to reduce the amount of
backfills, where we burn down everything and bring it up. It worked fine
every time until today.

While doing the re-base now, we faced an issue with celery which was not
able to talk to the backend database using sqlalchemy. There were no
changes made to our setup scripts in chef.

2016-11-12 07:34:14,386 ERROR:airflow.jobs.SchedulerJob[MainThread] u'No
such transport: sqla'
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 755,
in _execute
    executor.heartbeat()
  File "/usr/local/lib/python2.7/dist-packages/airflow/executors/base_executor.py",
line 99, in heartbeat
    self.execute_async(key, command=command, queue=queue)
  File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py",
line 66, in execute_async
    args=[command], queue=queue)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line
536, in apply_async
    **options
  File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line
714, in send_task
    with self.producer_or_acquire(producer) as P:
  File "/usr/local/lib/python2.7/dist-packages/celery/utils/objects.py",
line 85, in __enter__
    *self.fb_args, **self.fb_kwargs
  File "/usr/local/lib/python2.7/dist-packages/kombu/resource.py", line 83,
in acquire
    R = self.prepare(R)
  File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 62, in
prepare
    p = p()
  File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py",
line 203, in __call__
    return self.evaluate()
  File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py",
line 206, in evaluate
    return self._fun(*self._args, **self._kwargs)
  File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 42, in
create_producer
    conn = self._acquire_connection()
  File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 39, in
_acquire_connection
    return self.connections.acquire(block=True)
  File "/usr/local/lib/python2.7/dist-packages/kombu/resource.py", line 83,
in acquire
    R = self.prepare(R)
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
936, in prepare
    resource = resource()
  File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py",
line 203, in __call__
    return self.evaluate()
  File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py",
line 206, in evaluate
    return self._fun(*self._args, **self._kwargs)
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
908, in new
    return self.connection.clone()
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
587, in clone
    return self.__class__(**dict(self._info(resolve=False), **kwargs))
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
597, in _info
    D = self.transport.default_connection_params
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
832, in transport
    self._transport = self.create_transport()
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
576, in create_transport
    return self.get_transport_cls()(client=self)
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
582, in get_transport_cls
    transport_cls = get_transport_cls(transport_cls)
  File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py",
line 81, in get_transport_cls
    _transport_cache[transport] = resolve_transport(transport)
  File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py",
line 62, in resolve_transport
    raise KeyError('No such transport: {0}'.format(transport))
KeyError: u'No such transport: sqla'
2016-11-12 07:34:14,399 ERROR:airflow.jobs.SchedulerJob[MainThread]
Tachycardia!

The pip install for airflow[celery], installs celery version 4.0.0 and
kombu 4.0.0. Upon checking further, sqlalchemy has been removed from being
supported as a broker in latest version of kombu.
https://github.com/celery/kombu/commit/1cd4e07f9ebb2fdbde0f86054e963f
6bbd17e698#diff-7b8e685a0148804c7b353dc3f138d189

We had to manually apply a hotfix to downgrade the celery version to 3.1.15
which uses kombu version 3.0.37 that has sqla defined in TRANSPORT_ALIASES
dict of __init__.py (http://docs.celeryproject.
org/projects/kombu/en/latest/_modules/kombu/transport.html) whereas the
__init__.py for kombu 4.0.0 is missing it.

Is this a known issue now or has something changed w.r.t celery that we
have to include?

Please help. I would like your advice before I make any changes to the chef
scripts for airflow setup.

Thanks,
Nadeem

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message