airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maxime Beauchemin <maximebeauche...@gmail.com>
Subject Re: Exception in worker when pickling DAGs
Date Thu, 12 Oct 2017 19:04:28 GMT
FYI: there's been talks on deprecating pickling altogether as it's very
brittle.

Max

On Thu, Oct 12, 2017 at 10:45 AM, Alek Storm <alek.storm@gmail.com> wrote:

> Can anyone help with this? Has anyone successfully used Airflow with
> pickling turned on that can give details on their setup?
>
> Thanks,
> Alek
>
> On Mon, Oct 9, 2017 at 2:00 PM, Alek Storm <alek.storm@gmail.com> wrote:
>
> > Yes, everything's correctly imported - everything works fine when I run
> > the scheduler without pickling turned on.
> >
> > Thanks,
> > Alek
> >
> > On Mon, Oct 9, 2017 at 1:19 PM, Edgar Rodriguez <
> > edgar.rodriguez@airbnb.com.invalid> wrote:
> >
> >> The relevant part seems to be:
> >>
> >> ImportError: No module named
> >> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fba16effe386_fetch_orgmast
> >> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow run
> >> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599
> >> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' returned
> >> non-zero exit status 1
> >>
> >> Did you check that your task and script `fetch_orgmast.py` are correctly
> >> importing all modules that they use?
> >>
> >> Cheers,
> >> Edgar
> >>
> >> On Sat, Oct 7, 2017 at 11:25 AM, Alek Storm <alek.storm@gmail.com>
> wrote:
> >>
> >> > Hi all,
> >> >
> >> > When running the scheduler as airflow scheduler -p and the worker (on
> a
> >> > different box) as airflow worker -q foo, I get the following exception
> >> in
> >> > the worker process when triggering a DAG run manually:
> >> >
> >> > Traceback (most recent call last):
> >> >   File "/site/var/airflow/venv/bin/airflow", line 28, in <module>
> >> >     args.func(args)
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > airflow/bin/cli.py",
> >> > line 393, in run
> >> >     DagPickle).filter(DagPickle.id == args.pickle).first()
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > sqlalchemy/orm/query.py",
> >> > line 2755, in first
> >> >     ret = list(self[0:1])
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > sqlalchemy/orm/query.py",
> >> > line 2547, in __getitem__
> >> >     return list(res)
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > sqlalchemy/orm/loading.py",
> >> > line 90, in instances
> >> >     util.raise_from_cause(err)
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > sqlalchemy/util/compat.py",
> >> > line 203, in raise_from_cause
> >> >     reraise(type(exception), exception, tb=exc_tb, cause=cause)
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > sqlalchemy/orm/loading.py",
> >> > line 75, in instances
> >> >     rows = [proc(row) for row in fetch]
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > sqlalchemy/orm/loading.py",
> >> > line 437, in _instance
> >> >     loaded_instance, populate_existing, populators)
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > sqlalchemy/orm/loading.py",
> >> > line 498, in _populate_full
> >> >     dict_[key] = getter(row)
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > sqlalchemy/sql/sqltypes.py",
> >> > line 1540, in process
> >> >     return loads(value)
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > dill/dill.py",
> >> > line 299, in loads
> >> >     return load(file)
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > dill/dill.py",
> >> > line 288, in load
> >> >     obj = pik.load()
> >> >   File "/usr/lib/python2.7/pickle.py", line 858, in load
> >> >     dispatch[key](self)
> >> >   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
> >> >     klass = self.find_class(module, name)
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > dill/dill.py",
> >> > line 445, in find_class
> >> >     return StockUnpickler.find_class(self, module, name)
> >> >   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
> >> >     __import__(module)
> >> > ImportError: No module named
> >> > unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fba16effe386_fetch_orgmast
> >> > [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow run
> >> > fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599
> >> > --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' returned
> >> > non-zero exit status 1
> >> > [2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task
> >> > airflow.executors.celery_executor.execute_command[c73d77f5-
> >> 963c-44c1-b633-
> >> > dc00a752f58f]
> >> > raised unexpected: AirflowException('Celery command failed',)
> >> > Traceback (most recent call last):
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > celery/app/trace.py",
> >> > line 374, in trace_task
> >> >     R = retval = fun(*args, **kwargs)
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > celery/app/trace.py",
> >> > line 629, in __protected_call__
> >> >     return self.run(*args, **kwargs)
> >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> >> > airflow/executors/celery_executor.py",
> >> > line 62, in execute_command
> >> >     raise AirflowException('Celery command failed')
> >> > AirflowException: Celery command failed
> >> >
> >> > I’m using the CeleryExecutor and using Postgres 9.6 for both Airflow
> >> data
> >> > and the Celery result backend. Python version is 2.7.6. What am I
> doing
> >> > wrong?
> >> >
> >> > Alek
> >> > ​
> >> >
> >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message