airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bolke de Bruin <bdbr...@gmail.com>
Subject Re: Exception in worker when pickling DAGs
Date Mon, 23 Oct 2017 17:57:35 GMT
The other option is to use zipped dags and have those picked up by the workers from the api.
This is less error prone than pickling (marshmallow, cloudpickle). I have a working prototype
for this, but needs to be updated to the current airflow. 

Another option is to use copyreg for fields that are difficult the serialize and apply that
to the jinja2 template fields. This allows one to pickle. 

But I think we should deprecate pickling all together and move over to something external/api
wise.

Cheers
Bolke

Verstuurd vanaf mijn iPad

> Op 23 okt. 2017 om 19:35 heeft Ruslan Dautkhanov <dautkhanov@gmail.com> het volgende
geschreven:
> 
> I looked over cloudpickle that Alek mentioned. Looks cool - thanks for
> referencing that.
> https://github.com/cloudpipe/cloudpickle
> It could be a drop-in replacement for pickle and a low-hanging fruit to fix
> this issue.
> I don't see this to be an issue storing cloudpickled objects in a database
> table's blob field.
> Thanks.
> 
> 
> -- 
> Ruslan Dautkhanov
> 
> On Thu, Oct 12, 2017 at 2:56 PM, Maxime Beauchemin <
> maximebeauchemin@gmail.com> wrote:
> 
>> One issue that's been standing in the way is the fact that Jinja template
>> objects are not pickleable. That and the fact that when people pass objects
>> into their DAG objects (through params, callbacks or whatever other ways),
>> the serialization can get tangled and pickles become gigantic. People
>> typically don't understand the implications in that context.
>> 
>> For now there's a workaround the Jinja template pickling issue that limits
>> what you can do with Jinja (you'll notice that extends and imports just
>> won't work in a pickle/remote setting).
>> 
>> I remember spending a few hours on trying to pickle jinja templates in the
>> first weeks of the project and ultimately giving up. I'm sure someone could
>> get that working.
>> 
>> Here's another related question, is the database a proper transport layer
>> for pickles? It feels like a hack to me...
>> 
>> Another idea that was discussed was to create a new BaseDagFetcher
>> abstraction, along with a replacement for the current implementation:
>> FilesystemDagFecher. Then people could write/use whatever other
>> implementations like S3ZipDagFetcher, HDFSDagFetcher, PickleDagFetcher,
>> ....
>> 
>> Max
>> 
>>> On Thu, Oct 12, 2017 at 12:29 PM, Alek Storm <alek.storm@gmail.com> wrote:
>>> 
>>> That's disappointing. The promise of being able to deploy code just to
>> the
>>> Airflow master, and have that automatically propagated to workers, was a
>>> major selling point for us when we chose Airflow over its alternatives -
>> it
>>> would greatly simplify our deploy tooling, and free us from having to
>> worry
>>> about DAG definitions getting out of sync between the master and workers.
>>> 
>>> Perhaps the cloudpickle library, which came out of PySpark, could help
>>> here: https://github.com/cloudpipe/cloudpickle. It appears to be
>>> specifically designed for shipping Python code over a network.
>>> 
>>> Alek
>>> 
>>> On Thu, Oct 12, 2017 at 2:04 PM, Maxime Beauchemin <
>>> maximebeauchemin@gmail.com> wrote:
>>> 
>>>> FYI: there's been talks on deprecating pickling altogether as it's very
>>>> brittle.
>>>> 
>>>> Max
>>>> 
>>>> On Thu, Oct 12, 2017 at 10:45 AM, Alek Storm <alek.storm@gmail.com>
>>> wrote:
>>>> 
>>>>> Can anyone help with this? Has anyone successfully used Airflow with
>>>>> pickling turned on that can give details on their setup?
>>>>> 
>>>>> Thanks,
>>>>> Alek
>>>>> 
>>>>> On Mon, Oct 9, 2017 at 2:00 PM, Alek Storm <alek.storm@gmail.com>
>>> wrote:
>>>>> 
>>>>>> Yes, everything's correctly imported - everything works fine when
I
>>> run
>>>>>> the scheduler without pickling turned on.
>>>>>> 
>>>>>> Thanks,
>>>>>> Alek
>>>>>> 
>>>>>> On Mon, Oct 9, 2017 at 1:19 PM, Edgar Rodriguez <
>>>>>> edgar.rodriguez@airbnb.com.invalid> wrote:
>>>>>> 
>>>>>>> The relevant part seems to be:
>>>>>>> 
>>>>>>> ImportError: No module named
>>>>>>> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb
>>> a16effe386_fetch_orgmast
>>>>>>> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command 'airflow
>>> run
>>>>>>> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle
599
>>>>>>> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py'
>> returned
>>>>>>> non-zero exit status 1
>>>>>>> 
>>>>>>> Did you check that your task and script `fetch_orgmast.py` are
>>>> correctly
>>>>>>> importing all modules that they use?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Edgar
>>>>>>> 
>>>>>>> On Sat, Oct 7, 2017 at 11:25 AM, Alek Storm <alek.storm@gmail.com
>>> 
>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi all,
>>>>>>>> 
>>>>>>>> When running the scheduler as airflow scheduler -p and the
>> worker
>>>> (on
>>>>> a
>>>>>>>> different box) as airflow worker -q foo, I get the following
>>>> exception
>>>>>>> in
>>>>>>>> the worker process when triggering a DAG run manually:
>>>>>>>> 
>>>>>>>> Traceback (most recent call last):
>>>>>>>>  File "/site/var/airflow/venv/bin/airflow", line 28, in
>> <module>
>>>>>>>>    args.func(args)
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> airflow/bin/cli.py",
>>>>>>>> line 393, in run
>>>>>>>>    DagPickle).filter(DagPickle.id == args.pickle).first()
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> sqlalchemy/orm/query.py",
>>>>>>>> line 2755, in first
>>>>>>>>    ret = list(self[0:1])
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> sqlalchemy/orm/query.py",
>>>>>>>> line 2547, in __getitem__
>>>>>>>>    return list(res)
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> sqlalchemy/orm/loading.py",
>>>>>>>> line 90, in instances
>>>>>>>>    util.raise_from_cause(err)
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> sqlalchemy/util/compat.py",
>>>>>>>> line 203, in raise_from_cause
>>>>>>>>    reraise(type(exception), exception, tb=exc_tb, cause=cause)
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> sqlalchemy/orm/loading.py",
>>>>>>>> line 75, in instances
>>>>>>>>    rows = [proc(row) for row in fetch]
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> sqlalchemy/orm/loading.py",
>>>>>>>> line 437, in _instance
>>>>>>>>    loaded_instance, populate_existing, populators)
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> sqlalchemy/orm/loading.py",
>>>>>>>> line 498, in _populate_full
>>>>>>>>    dict_[key] = getter(row)
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> sqlalchemy/sql/sqltypes.py",
>>>>>>>> line 1540, in process
>>>>>>>>    return loads(value)
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> dill/dill.py",
>>>>>>>> line 299, in loads
>>>>>>>>    return load(file)
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> dill/dill.py",
>>>>>>>> line 288, in load
>>>>>>>>    obj = pik.load()
>>>>>>>>  File "/usr/lib/python2.7/pickle.py", line 858, in load
>>>>>>>>    dispatch[key](self)
>>>>>>>>  File "/usr/lib/python2.7/pickle.py", line 1090, in
>> load_global
>>>>>>>>    klass = self.find_class(module, name)
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> dill/dill.py",
>>>>>>>> line 445, in find_class
>>>>>>>>    return StockUnpickler.find_class(self, module, name)
>>>>>>>>  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>>>>>>>>    __import__(module)
>>>>>>>> ImportError: No module named
>>>>>>>> unusual_prefix_9b311bfeb8bf0fca09b0857b2b60fb
>>>> a16effe386_fetch_orgmast
>>>>>>>> [2017-10-07 13:18:22,155: ERROR/ForkPoolWorker-5] Command
>> 'airflow
>>>> run
>>>>>>>> fetch_orgmast latest_only 2017-10-07T13:18:07.489500 --pickle
>> 599
>>>>>>>> --local -sd /site/conf/airflow/dags/data/fetch_orgmast.py'
>>> returned
>>>>>>>> non-zero exit status 1
>>>>>>>> [2017-10-07 13:18:22,196: ERROR/ForkPoolWorker-5] Task
>>>>>>>> airflow.executors.celery_executor.execute_command[c73d77f5-
>>>>>>> 963c-44c1-b633-
>>>>>>>> dc00a752f58f]
>>>>>>>> raised unexpected: AirflowException('Celery command failed',)
>>>>>>>> Traceback (most recent call last):
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> celery/app/trace.py",
>>>>>>>> line 374, in trace_task
>>>>>>>>    R = retval = fun(*args, **kwargs)
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> celery/app/trace.py",
>>>>>>>> line 629, in __protected_call__
>>>>>>>>    return self.run(*args, **kwargs)
>>>>>>>>  File "/site/var/airflow/venv/local/
>> lib/python2.7/site-packages/
>>>>>>>> airflow/executors/celery_executor.py",
>>>>>>>> line 62, in execute_command
>>>>>>>>    raise AirflowException('Celery command failed')
>>>>>>>> AirflowException: Celery command failed
>>>>>>>> 
>>>>>>>> I’m using the CeleryExecutor and using Postgres 9.6 for
both
>>> Airflow
>>>>>>> data
>>>>>>>> and the Celery result backend. Python version is 2.7.6. What
am
>> I
>>>>> doing
>>>>>>>> wrong?
>>>>>>>> 
>>>>>>>> Alek
>>>>>>>> ​
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Mime
View raw message