airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maxime Beauchemin <maximebeauche...@gmail.com>
Subject Re: pickle exhausted error when pulling from xcom
Date Mon, 11 Jul 2016 21:02:29 GMT
Hi,

The blob type in MySql is not very large, from my (also insufficient)
memory, it's 64kb. You probably want to alter the `pickle` field in your DB
to a MEDIUMBLOB or LONGBLOB.

Max

On Mon, Jul 11, 2016 at 2:24 PM, Clark, Louis <LClark@intrexon.com> wrote:

> Hello,
>   Since we switched our Airflow system to using MySQL as a model store, I
> have been getting errors like: _pickle.UnpicklingError: pickle exhausted
> before end of frame.  Trace is below.  It occurs after an xcom_pull() and
> the trace goes through sqlalchemy.  A deeper dive into the DB suggests that
> the pickle is being stored in the dag_pickle.pickle column as a blob, which
> has a max size of 65,535 bytes.
> airflow> desc dag_pickle;
> +--------------+------------+------+-----+---------+----------------+
> | Field        | Type       | Null | Key | Default | Extra          |
> +--------------+------------+------+-----+---------+----------------+
> | id           | int(11)    | NO   | PRI | NULL    | auto_increment |
> | pickle       | blob       | YES  |     | NULL    |                |
> | created_dttm | datetime   | YES  |     | NULL    |                |
> | pickle_hash  | bigint(20) | YES  |     | NULL    |                |
> +--------------+------------+------+-----+---------+----------------+
>
> Any ideas?  I wonder if there is an easy way to switch to a mediumblob.
> I'm looking at models.py now.
>
> thanks,
>
> -Louis
>
> [2016-07-11 10:03:48,851] {models.py:1219} INFO - Executing
> <Task(PythonOperator): pivot> on 2016-07-11 09:23:00
> [2016-07-11 10:03:48,889] {models.py:1286} ERROR - pickle exhausted before
> end of frame
> Traceback (most recent call last):
>   File "/home/myuser/src/airflow/models.py", line 1245, in run
>     result = task_copy.execute(context=context)
>   File "/home/myuser/src/airflow/operators/python_operator.py", line 66,
> in execute
>     return_value = self.python_callable(*self.op_args, **self.op_kwargs)
>   File "/home/myuser/src/airflow/dags/runsetMonitor.py", line 425, in
> updatePivotTables
>     tups = ti.xcom_pull(key='run successes', task_ids=runTaskID)
>   File "/home/myuser/src/airflow/models.py", line 1514, in xcom_pull
>     return pull_fn(task_id=task_ids)
>   File "/home/myuser/src/airflow/utils/db.py", line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/home/myuser/src/airflow/models.py", line 3240, in get_one
>     result = query.first()
>   File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/query.py",
> line 2659, in first
>     ret = list(self[0:1])
>   File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/query.py",
> line 2457, in __getitem__
>     return list(res)
>   File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",
> line 86, in instances
>     util.raise_from_cause(err)
>   File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/util/compat.py",
> line 202, in raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>   File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/util/compat.py",
> line 186, in reraise
>     raise value
>   File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",
> line 74, in instances
>     for row in fetch]
>   File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",
> line 74, in <listcomp>
>     for row in fetch]
>   File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",
> line 73, in <listcomp>
>     rows = [keyed_tuple([proc(row) for proc in process])
>   File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/sql/sqltypes.py",
> line 1253, in process
>     return loads(value)
>   File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dill.py",
> line 260, in loads
>     return load(file)
>   File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dill.py",
> line 250, in load
>     obj = pik.load()
>   File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", line
> 1039, in load
>     dispatch[key[0]](self)
>   File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", line
> 1197, in load_binunicode
>     self.append(str(self.read(len), 'utf-8', 'surrogatepass'))
>   File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", line
> 234, in read
>     "pickle exhausted before end of frame")
> _pickle.UnpicklingError: pickle exhausted before end of frame
> [2016-07-11 10:03:48,896] {models.py:1306} INFO - Marking task as FAILED.
> [2016-07-11 10:03:48,920] {email.py:96} INFO - Sent an alert email to
> [redacted]
> [2016-07-11 10:03:48,974] {models.py:1327} ERROR - pickle exhausted before
> end of frame
>
>
>
> ________________________________
>
> CONFIDENTIAL TRANSMISSION - This message, including any attachments, is
> confidential and may be privileged. If you are not the intended recipient,
> please delete it without further distribution and reply to the sender that
> you have received the message in error.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message