airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ruslan Dautkhanov <dautkha...@gmail.com>
Subject Re: Exception in worker when pickling DAGs
Date Mon, 23 Oct 2017 17:35:10 GMT
I looked over cloudpickle that Alek mentioned. Looks cool - thanks for
referencing that.
https://github.com/cloudpipe/cloudpickle
It could be a drop-in replacement for pickle and a low-hanging fruit to fix
this issue.
I don't see this to be an issue storing cloudpickled objects in a database
table's blob field.
Thanks.


-- 
Ruslan Dautkhanov

On Thu, Oct 12, 2017 at 2:56 PM, Maxime Beauchemin <
maximebeauchemin@gmail.com> wrote:

> One issue that's been standing in the way is the fact that Jinja template
> objects are not pickleable. That and the fact that when people pass objects
> into their DAG objects (through params, callbacks or whatever other ways),
> the serialization can get tangled and pickles become gigantic. People
> typically don't understand the implications in that context.
>
> For now there's a workaround the Jinja template pickling issue that limits
> what you can do with Jinja (you'll notice that extends and imports just
> won't work in a pickle/remote setting).
>
> I remember spending a few hours on trying to pickle jinja templates in the
> first weeks of the project and ultimately giving up. I'm sure someone could
> get that working.
>
> Here's another related question, is the database a proper transport layer
> for pickles? It feels like a hack to me...
>
> Another idea that was discussed was to create a new BaseDagFetcher
> abstraction, along with a replacement for the current implementation:
> FilesystemDagFecher. Then people could write/use whatever other
> implementations like S3ZipDagFetcher, HDFSDagFetcher, PickleDagFetcher,
> ....
>
> Max
>
> On Thu, Oct 12, 2017 at 12:29 PM, Alek Storm <alek.storm@gmail.com> wrote:
>
> > That's disappointing. The promise of being able to deploy code just to
> the
> > Airflow master, and have that automatically propagated to workers, was a
> > major selling point for us when we chose Airflow over its alternatives -
> it
> > would greatly simplify our deploy tooling, and free us from having to
> worry
> > about DAG definitions getting out of sync between the master and workers.
> >
> > Perhaps the cloudpickle library, which came out of PySpark, could help
> > here: https://github.com/cloudpipe/cloudpickle. It appears to be
> > specifically designed for shipping Python code over a network.
> >
> > Alek
> >
> > On Thu, Oct 12, 2017 at 2:04 PM, Maxime Beauchemin <
> > maximebeauchemin@gmail.com> wrote:
> >
> > > FYI: there's been talks on deprecating pickling altogether as it's very
> > > brittle.
> > >
> > > Max
> > >
> > > On Thu, Oct 12, 2017 at 10:45 AM, Alek Storm <alek.storm@gmail.com>
> > wrote:
> > >
> > > > Can anyone help with this? Has anyone successfully used Airflow with
> > > > pickling turned on that can give details on their setup?
> > > >
> > > > Thanks,
> > > > Alek
> > > >
> > > > On Mon, Oct 9, 2017 at 2:00 PM, Alek Storm <alek.storm@gmail.com>
> > wrote:
> > > >
> > > > > Yes, everything's correctly imported - everything works fine when
I
> > run
> > > > > the scheduler without pickling turned on.
> > > > >
> > > > > Thanks,
> > > > > Alek
> > > > >
> > > > > On Mon, Oct 9, 2017 at 1:19 PM, Edgar Rodriguez <
> > > > > edgar.rodriguez@airbnb.com.invalid> wrote:
> > > > >
> > > > >> The relevant part seems to be:
> > > > >>
> > > > >> ImportError: No module named
> > > > >> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb
> > a16effe386_fetch_orgmast
> > > > >> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow
> > run
> > > > >> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle
599
> > > > >> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py'
> returned
> > > > >> non-zero exit status 1
> > > > >>
> > > > >> Did you check that your task and script `fetch_orgmast.py` are
> > > correctly
> > > > >> importing all modules that they use?
> > > > >>
> > > > >> Cheers,
> > > > >> Edgar
> > > > >>
> > > > >> On Sat, Oct 7, 2017 at 11:25 AM, Alek Storm <alek.storm@gmail.com
> >
> > > > wrote:
> > > > >>
> > > > >> > Hi all,
> > > > >> >
> > > > >> > When running the scheduler as airflow scheduler -p and the
> worker
> > > (on
> > > > a
> > > > >> > different box) as airflow worker -q foo, I get the following
> > > exception
> > > > >> in
> > > > >> > the worker process when triggering a DAG run manually:
> > > > >> >
> > > > >> > Traceback (most recent call last):
> > > > >> >   File "/site/var/airflow/venv/bin/airflow", line 28, in
> <module>
> > > > >> >     args.func(args)
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > airflow/bin/cli.py",
> > > > >> > line 393, in run
> > > > >> >     DagPickle).filter(DagPickle.id == args.pickle).first()
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > sqlalchemy/orm/query.py",
> > > > >> > line 2755, in first
> > > > >> >     ret = list(self[0:1])
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > sqlalchemy/orm/query.py",
> > > > >> > line 2547, in __getitem__
> > > > >> >     return list(res)
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > sqlalchemy/orm/loading.py",
> > > > >> > line 90, in instances
> > > > >> >     util.raise_from_cause(err)
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > sqlalchemy/util/compat.py",
> > > > >> > line 203, in raise_from_cause
> > > > >> >     reraise(type(exception), exception, tb=exc_tb, cause=cause)
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > sqlalchemy/orm/loading.py",
> > > > >> > line 75, in instances
> > > > >> >     rows = [proc(row) for row in fetch]
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > sqlalchemy/orm/loading.py",
> > > > >> > line 437, in _instance
> > > > >> >     loaded_instance, populate_existing, populators)
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > sqlalchemy/orm/loading.py",
> > > > >> > line 498, in _populate_full
> > > > >> >     dict_[key] = getter(row)
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > sqlalchemy/sql/sqltypes.py",
> > > > >> > line 1540, in process
> > > > >> >     return loads(value)
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > dill/dill.py",
> > > > >> > line 299, in loads
> > > > >> >     return load(file)
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > dill/dill.py",
> > > > >> > line 288, in load
> > > > >> >     obj = pik.load()
> > > > >> >   File "/usr/lib/python2.7/pickle.py", line 858, in load
> > > > >> >     dispatch[key](self)
> > > > >> >   File "/usr/lib/python2.7/pickle.py", line 1090, in
> load_global
> > > > >> >     klass = self.find_class(module, name)
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > dill/dill.py",
> > > > >> > line 445, in find_class
> > > > >> >     return StockUnpickler.find_class(self, module, name)
> > > > >> >   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
> > > > >> >     __import__(module)
> > > > >> > ImportError: No module named
> > > > >> > unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb
> > > a16effe386_fetch_orgmast
> > > > >> > [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command
> 'airflow
> > > run
> > > > >> > fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle
> 599
> > > > >> > --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py'
> > returned
> > > > >> > non-zero exit status 1
> > > > >> > [2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task
> > > > >> > airflow.executors.celery_executor.execute_command[c73d77f5-
> > > > >> 963c-44c1-b633-
> > > > >> > dc00a752f58f]
> > > > >> > raised unexpected: AirflowException('Celery command failed',)
> > > > >> > Traceback (most recent call last):
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > celery/app/trace.py",
> > > > >> > line 374, in trace_task
> > > > >> >     R = retval = fun(*args, **kwargs)
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > celery/app/trace.py",
> > > > >> > line 629, in __protected_call__
> > > > >> >     return self.run(*args, **kwargs)
> > > > >> >   File "/site/var/airflow/venv/local/
> lib/python2.7/site-packages/
> > > > >> > airflow/executors/celery_executor.py",
> > > > >> > line 62, in execute_command
> > > > >> >     raise AirflowException('Celery command failed')
> > > > >> > AirflowException: Celery command failed
> > > > >> >
> > > > >> > I’m using the CeleryExecutor and using Postgres 9.6 for
both
> > Airflow
> > > > >> data
> > > > >> > and the Celery result backend. Python version is 2.7.6.
What am
> I
> > > > doing
> > > > >> > wrong?
> > > > >> >
> > > > >> > Alek
> > > > >> > ​
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message