airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ruslan Dautkhanov <dautkha...@gmail.com>
Subject Re: Exception in worker when pickling DAGs
Date Mon, 23 Oct 2017 18:39:22 GMT
Thanks Bolke. Good points pickling vs external API..
Here's a bit on cloudpickle performance comparison
https://github.com/cloudpipe/cloudpickle/issues/58
https://github.com/cloudpipe/cloudpickle/issues/44
https://github.com/RaRe-Technologies/gensim/issues/558
DAGs aren't normally that large to be concerned much with mem/performance?
(well, at least in cases we work with)


Best regards,
Ruslan

On Mon, Oct 23, 2017 at 11:59 AM, Bolke de Bruin <bdbruin@gmail.com> wrote:

> But cloudpickle looks promising. What’s the speed, mem requirements?
>
> Bolke
>
> Verstuurd vanaf mijn iPad
>
> > Op 23 okt. 2017 om 19:57 heeft Bolke de Bruin <bdbruin@gmail.com> het
> volgende geschreven:
> >
> > The other option is to use zipped dags and have those picked up by the
> workers from the api. This is less error prone than pickling (marshmallow,
> cloudpickle). I have a working prototype for this, but needs to be updated
> to the current airflow.
> >
> > Another option is to use copyreg for fields that are difficult the
> serialize and apply that to the jinja2 template fields. This allows one to
> pickle.
> >
> > But I think we should deprecate pickling all together and move over to
> something external/api wise.
> >
> > Cheers
> > Bolke
> >
> > Verstuurd vanaf mijn iPad
> >
> >> Op 23 okt. 2017 om 19:35 heeft Ruslan Dautkhanov <dautkhanov@gmail.com>
> het volgende geschreven:
> >>
> >> I looked over cloudpickle that Alek mentioned. Looks cool - thanks for
> >> referencing that.
> >> https://github.com/cloudpipe/cloudpickle
> >> It could be a drop-in replacement for pickle and a low-hanging fruit to
> fix
> >> this issue.
> >> I don't see this to be an issue storing cloudpickled objects in a
> database
> >> table's blob field.
> >> Thanks.
> >>
> >>
> >> --
> >> Ruslan Dautkhanov
> >>
> >> On Thu, Oct 12, 2017 at 2:56 PM, Maxime Beauchemin <
> >> maximebeauchemin@gmail.com> wrote:
> >>
> >>> One issue that's been standing in the way is the fact that Jinja
> template
> >>> objects are not pickleable. That and the fact that when people pass
> objects
> >>> into their DAG objects (through params, callbacks or whatever other
> ways),
> >>> the serialization can get tangled and pickles become gigantic. People
> >>> typically don't understand the implications in that context.
> >>>
> >>> For now there's a workaround the Jinja template pickling issue that
> limits
> >>> what you can do with Jinja (you'll notice that extends and imports just
> >>> won't work in a pickle/remote setting).
> >>>
> >>> I remember spending a few hours on trying to pickle jinja templates in
> the
> >>> first weeks of the project and ultimately giving up. I'm sure someone
> could
> >>> get that working.
> >>>
> >>> Here's another related question, is the database a proper transport
> layer
> >>> for pickles? It feels like a hack to me...
> >>>
> >>> Another idea that was discussed was to create a new BaseDagFetcher
> >>> abstraction, along with a replacement for the current implementation:
> >>> FilesystemDagFecher. Then people could write/use whatever other
> >>> implementations like S3ZipDagFetcher, HDFSDagFetcher, PickleDagFetcher,
> >>> ....
> >>>
> >>> Max
> >>>
> >>>> On Thu, Oct 12, 2017 at 12:29 PM, Alek Storm <alek.storm@gmail.com>
> wrote:
> >>>>
> >>>> That's disappointing. The promise of being able to deploy code just
to
> >>> the
> >>>> Airflow master, and have that automatically propagated to workers,
> was a
> >>>> major selling point for us when we chose Airflow over its
> alternatives -
> >>> it
> >>>> would greatly simplify our deploy tooling, and free us from having to
> >>> worry
> >>>> about DAG definitions getting out of sync between the master and
> workers.
> >>>>
> >>>> Perhaps the cloudpickle library, which came out of PySpark, could help
> >>>> here: https://github.com/cloudpipe/cloudpickle. It appears to be
> >>>> specifically designed for shipping Python code over a network.
> >>>>
> >>>> Alek
> >>>>
> >>>> On Thu, Oct 12, 2017 at 2:04 PM, Maxime Beauchemin <
> >>>> maximebeauchemin@gmail.com> wrote:
> >>>>
> >>>>> FYI: there's been talks on deprecating pickling altogether as it's
> very
> >>>>> brittle.
> >>>>>
> >>>>> Max
> >>>>>
> >>>>> On Thu, Oct 12, 2017 at 10:45 AM, Alek Storm <alek.storm@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Can anyone help with this? Has anyone successfully used Airflow
with
> >>>>>> pickling turned on that can give details on their setup?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Alek
> >>>>>>
> >>>>>> On Mon, Oct 9, 2017 at 2:00 PM, Alek Storm <alek.storm@gmail.com>
> >>>> wrote:
> >>>>>>
> >>>>>>> Yes, everything's correctly imported - everything works
fine when I
> >>>> run
> >>>>>>> the scheduler without pickling turned on.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Alek
> >>>>>>>
> >>>>>>> On Mon, Oct 9, 2017 at 1:19 PM, Edgar Rodriguez <
> >>>>>>> edgar.rodriguez@airbnb.com.invalid> wrote:
> >>>>>>>
> >>>>>>>> The relevant part seems to be:
> >>>>>>>>
> >>>>>>>> ImportError: No module named
> >>>>>>>> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb
> >>>> a16effe386_fetch_orgmast
> >>>>>>>> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command
'airflow
> >>>> run
> >>>>>>>> fetch_orgmast latest_only 2017-10-07T13:18:07.489500
--pickle 599
> >>>>>>>> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py'
> >>> returned
> >>>>>>>> non-zero exit status 1
> >>>>>>>>
> >>>>>>>> Did you check that your task and script `fetch_orgmast.py`
are
> >>>>> correctly
> >>>>>>>> importing all modules that they use?
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Edgar
> >>>>>>>>
> >>>>>>>> On Sat, Oct 7, 2017 at 11:25 AM, Alek Storm <alek.storm@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> When running the scheduler as airflow scheduler
-p and the
> >>> worker
> >>>>> (on
> >>>>>> a
> >>>>>>>>> different box) as airflow worker -q foo, I get the
following
> >>>>> exception
> >>>>>>>> in
> >>>>>>>>> the worker process when triggering a DAG run manually:
> >>>>>>>>>
> >>>>>>>>> Traceback (most recent call last):
> >>>>>>>>> File "/site/var/airflow/venv/bin/airflow", line
28, in
> >>> <module>
> >>>>>>>>>   args.func(args)
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> airflow/bin/cli.py",
> >>>>>>>>> line 393, in run
> >>>>>>>>>   DagPickle).filter(DagPickle.id == args.pickle).first()
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> sqlalchemy/orm/query.py",
> >>>>>>>>> line 2755, in first
> >>>>>>>>>   ret = list(self[0:1])
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> sqlalchemy/orm/query.py",
> >>>>>>>>> line 2547, in __getitem__
> >>>>>>>>>   return list(res)
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> sqlalchemy/orm/loading.py",
> >>>>>>>>> line 90, in instances
> >>>>>>>>>   util.raise_from_cause(err)
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> sqlalchemy/util/compat.py",
> >>>>>>>>> line 203, in raise_from_cause
> >>>>>>>>>   reraise(type(exception), exception, tb=exc_tb,
cause=cause)
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> sqlalchemy/orm/loading.py",
> >>>>>>>>> line 75, in instances
> >>>>>>>>>   rows = [proc(row) for row in fetch]
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> sqlalchemy/orm/loading.py",
> >>>>>>>>> line 437, in _instance
> >>>>>>>>>   loaded_instance, populate_existing, populators)
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> sqlalchemy/orm/loading.py",
> >>>>>>>>> line 498, in _populate_full
> >>>>>>>>>   dict_[key] = getter(row)
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> sqlalchemy/sql/sqltypes.py",
> >>>>>>>>> line 1540, in process
> >>>>>>>>>   return loads(value)
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> dill/dill.py",
> >>>>>>>>> line 299, in loads
> >>>>>>>>>   return load(file)
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> dill/dill.py",
> >>>>>>>>> line 288, in load
> >>>>>>>>>   obj = pik.load()
> >>>>>>>>> File "/usr/lib/python2.7/pickle.py", line 858, in
load
> >>>>>>>>>   dispatch[key](self)
> >>>>>>>>> File "/usr/lib/python2.7/pickle.py", line 1090,
in
> >>> load_global
> >>>>>>>>>   klass = self.find_class(module, name)
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> dill/dill.py",
> >>>>>>>>> line 445, in find_class
> >>>>>>>>>   return StockUnpickler.find_class(self, module,
name)
> >>>>>>>>> File "/usr/lib/python2.7/pickle.py", line 1124,
in find_class
> >>>>>>>>>   __import__(module)
> >>>>>>>>> ImportError: No module named
> >>>>>>>>> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb
> >>>>> a16effe386_fetch_orgmast
> >>>>>>>>> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5]
Command
> >>> 'airflow
> >>>>> run
> >>>>>>>>> fetch_orgmast latest_only 2017-10-07T13:18:07.489500
--pickle
> >>> 599
> >>>>>>>>> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py'
> >>>> returned
> >>>>>>>>> non-zero exit status 1
> >>>>>>>>> [2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5]
Task
> >>>>>>>>> airflow.executors.celery_executor.execute_command[c73d77f5-
> >>>>>>>> 963c-44c1-b633-
> >>>>>>>>> dc00a752f58f]
> >>>>>>>>> raised unexpected: AirflowException('Celery command
failed',)
> >>>>>>>>> Traceback (most recent call last):
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> celery/app/trace.py",
> >>>>>>>>> line 374, in trace_task
> >>>>>>>>>   R = retval = fun(*args, **kwargs)
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> celery/app/trace.py",
> >>>>>>>>> line 629, in __protected_call__
> >>>>>>>>>   return self.run(*args, **kwargs)
> >>>>>>>>> File "/site/var/airflow/venv/local/
> >>> lib/python2.7/site-packages/
> >>>>>>>>> airflow/executors/celery_executor.py",
> >>>>>>>>> line 62, in execute_command
> >>>>>>>>>   raise AirflowException('Celery command failed')
> >>>>>>>>> AirflowException: Celery command failed
> >>>>>>>>>
> >>>>>>>>> I’m using the CeleryExecutor and using Postgres
9.6 for both
> >>>> Airflow
> >>>>>>>> data
> >>>>>>>>> and the Celery result backend. Python version is
2.7.6. What am
> >>> I
> >>>>>> doing
> >>>>>>>>> wrong?
> >>>>>>>>>
> >>>>>>>>> Alek
> >>>>>>>>> ​
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message