airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maxime Beauchemin <maximebeauche...@gmail.com>
Subject Re: Exception in worker when pickling DAGs
Date Thu, 12 Oct 2017 20:56:14 GMT
One issue that's been standing in the way is the fact that Jinja template
objects are not pickleable. That and the fact that when people pass objects
into their DAG objects (through params, callbacks or whatever other ways),
the serialization can get tangled and pickles become gigantic. People
typically don't understand the implications in that context.

For now there's a workaround the Jinja template pickling issue that limits
what you can do with Jinja (you'll notice that extends and imports just
won't work in a pickle/remote setting).

I remember spending a few hours on trying to pickle jinja templates in the
first weeks of the project and ultimately giving up. I'm sure someone could
get that working.

Here's another related question, is the database a proper transport layer
for pickles? It feels like a hack to me...

Another idea that was discussed was to create a new BaseDagFetcher
abstraction, along with a replacement for the current implementation:
FilesystemDagFecher. Then people could write/use whatever other
implementations like S3ZipDagFetcher, HDFSDagFetcher, PickleDagFetcher, ....

Max

On Thu, Oct 12, 2017 at 12:29 PM, Alek Storm <alek.storm@gmail.com> wrote:

> That's disappointing. The promise of being able to deploy code just to the
> Airflow master, and have that automatically propagated to workers, was a
> major selling point for us when we chose Airflow over its alternatives - it
> would greatly simplify our deploy tooling, and free us from having to worry
> about DAG definitions getting out of sync between the master and workers.
>
> Perhaps the cloudpickle library, which came out of PySpark, could help
> here: https://github.com/cloudpipe/cloudpickle. It appears to be
> specifically designed for shipping Python code over a network.
>
> Alek
>
> On Thu, Oct 12, 2017 at 2:04 PM, Maxime Beauchemin <
> maximebeauchemin@gmail.com> wrote:
>
> > FYI: there's been talks on deprecating pickling altogether as it's very
> > brittle.
> >
> > Max
> >
> > On Thu, Oct 12, 2017 at 10:45 AM, Alek Storm <alek.storm@gmail.com>
> wrote:
> >
> > > Can anyone help with this? Has anyone successfully used Airflow with
> > > pickling turned on that can give details on their setup?
> > >
> > > Thanks,
> > > Alek
> > >
> > > On Mon, Oct 9, 2017 at 2:00 PM, Alek Storm <alek.storm@gmail.com>
> wrote:
> > >
> > > > Yes, everything's correctly imported - everything works fine when I
> run
> > > > the scheduler without pickling turned on.
> > > >
> > > > Thanks,
> > > > Alek
> > > >
> > > > On Mon, Oct 9, 2017 at 1:19 PM, Edgar Rodriguez <
> > > > edgar.rodriguez@airbnb.com.invalid> wrote:
> > > >
> > > >> The relevant part seems to be:
> > > >>
> > > >> ImportError: No module named
> > > >> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb
> a16effe386_fetch_orgmast
> > > >> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow
> run
> > > >> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle 599
> > > >> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py' returned
> > > >> non-zero exit status 1
> > > >>
> > > >> Did you check that your task and script `fetch_orgmast.py` are
> > correctly
> > > >> importing all modules that they use?
> > > >>
> > > >> Cheers,
> > > >> Edgar
> > > >>
> > > >> On Sat, Oct 7, 2017 at 11:25 AM, Alek Storm <alek.storm@gmail.com>
> > > wrote:
> > > >>
> > > >> > Hi all,
> > > >> >
> > > >> > When running the scheduler as airflow scheduler -p and the worker
> > (on
> > > a
> > > >> > different box) as airflow worker -q foo, I get the following
> > exception
> > > >> in
> > > >> > the worker process when triggering a DAG run manually:
> > > >> >
> > > >> > Traceback (most recent call last):
> > > >> >   File "/site/var/airflow/venv/bin/airflow", line 28, in <module>
> > > >> >     args.func(args)
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > airflow/bin/cli.py",
> > > >> > line 393, in run
> > > >> >     DagPickle).filter(DagPickle.id == args.pickle).first()
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > sqlalchemy/orm/query.py",
> > > >> > line 2755, in first
> > > >> >     ret = list(self[0:1])
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > sqlalchemy/orm/query.py",
> > > >> > line 2547, in __getitem__
> > > >> >     return list(res)
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > sqlalchemy/orm/loading.py",
> > > >> > line 90, in instances
> > > >> >     util.raise_from_cause(err)
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > sqlalchemy/util/compat.py",
> > > >> > line 203, in raise_from_cause
> > > >> >     reraise(type(exception), exception, tb=exc_tb, cause=cause)
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > sqlalchemy/orm/loading.py",
> > > >> > line 75, in instances
> > > >> >     rows = [proc(row) for row in fetch]
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > sqlalchemy/orm/loading.py",
> > > >> > line 437, in _instance
> > > >> >     loaded_instance, populate_existing, populators)
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > sqlalchemy/orm/loading.py",
> > > >> > line 498, in _populate_full
> > > >> >     dict_[key] = getter(row)
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > sqlalchemy/sql/sqltypes.py",
> > > >> > line 1540, in process
> > > >> >     return loads(value)
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > dill/dill.py",
> > > >> > line 299, in loads
> > > >> >     return load(file)
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > dill/dill.py",
> > > >> > line 288, in load
> > > >> >     obj = pik.load()
> > > >> >   File "/usr/lib/python2.7/pickle.py", line 858, in load
> > > >> >     dispatch[key](self)
> > > >> >   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
> > > >> >     klass = self.find_class(module, name)
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > dill/dill.py",
> > > >> > line 445, in find_class
> > > >> >     return StockUnpickler.find_class(self, module, name)
> > > >> >   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
> > > >> >     __import__(module)
> > > >> > ImportError: No module named
> > > >> > unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb
> > a16effe386_fetch_orgmast
> > > >> > [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow
> > run
> > > >> > fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle
599
> > > >> > --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py'
> returned
> > > >> > non-zero exit status 1
> > > >> > [2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task
> > > >> > airflow.executors.celery_executor.execute_command[c73d77f5-
> > > >> 963c-44c1-b633-
> > > >> > dc00a752f58f]
> > > >> > raised unexpected: AirflowException('Celery command failed',)
> > > >> > Traceback (most recent call last):
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > celery/app/trace.py",
> > > >> > line 374, in trace_task
> > > >> >     R = retval = fun(*args, **kwargs)
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > celery/app/trace.py",
> > > >> > line 629, in __protected_call__
> > > >> >     return self.run(*args, **kwargs)
> > > >> >   File "/site/var/airflow/venv/local/lib/python2.7/site-packages/
> > > >> > airflow/executors/celery_executor.py",
> > > >> > line 62, in execute_command
> > > >> >     raise AirflowException('Celery command failed')
> > > >> > AirflowException: Celery command failed
> > > >> >
> > > >> > I’m using the CeleryExecutor and using Postgres 9.6 for both
> Airflow
> > > >> data
> > > >> > and the Celery result backend. Python version is 2.7.6. What
am I
> > > doing
> > > >> > wrong?
> > > >> >
> > > >> > Alek
> > > >> > ​
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message