flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Geoffrey Mon <geof...@gmail.com>
Subject Python API iteration issues
Date Sun, 23 Oct 2016 02:58:53 GMT
Hello all,

Thanks to Chesnay for the assistance with my other issues. I have one final
issue that I can't figure out how to solve that should hopefully be the
last one. I have been working on implementing bulk iterations in the Python
API and using said iterations in a research project.

At the moment, I get an interesting exception coming from some deserializer
functions related to the iterators in the Python API. When I use a data set
(in the example, named "S") two times, one time in a join with an iterative
data set within an iteration and then another time after the iteration in a
join with the iteration result, such as in
https://gist.github.com/GEOFBOT/d670f567f8c886572c8715a6058f8b34, I usually
get the following exception:
Traceback (most recent call last):
  File "/tmp/flink-dist-cache-.../flink/plan.py", line 103, in <module>
    env.execute(local=True)
  File "/tmp/flink-dist-cache-.../.../flink/flink/plan/Environment.py",
line 198, in execute
    operator._go()
  File "/tmp/flink-dist-cache-.../.../flink/flink/functions/Function.py",
line 63, in _go
    self._receive_broadcast_variables()
  File "/tmp/flink-dist-cache-.../.../flink/flink/functions/Function.py",
line 75, in _receive_broadcast_variables
    serializer_data = _get_deserializer(con.read_secondary,
self._env._types)
  File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
line 262, in _get_deserializer
    return TupleDeserializer([_get_deserializer(read, custom_types) for _
in range(ord(type))])
  File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
line 262, in _get_deserializer
    return TupleDeserializer([_get_deserializer(read, custom_types) for _
in range(ord(type))])
  File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
line 262, in _get_deserializer
    return TupleDeserializer([_get_deserializer(read, custom_types) for _
in range(ord(type))])
  File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
line 262, in _get_deserializer
    return TupleDeserializer([_get_deserializer(read, custom_types) for _
in range(ord(type))])
  File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
line 262, in _get_deserializer
    return TupleDeserializer([_get_deserializer(read, custom_types) for _
in range(ord(type))])
  File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
line 285, in _get_deserializer
    raise Exception("Unable to find deserializer for type ID " +
str(ord(type)))
Exception: Unable to find deserializer for type ID 0

(I believe that another variant of the error message with "type ID 63" did
appear some of the time, suggesting a race condition or similar, but I
cannot reproduce this at the time of writing) Removing the second reuse of
the data set eliminates the problem.

While tracking down this problem, I found that if I have a data set (named
"S" again in this example) that has zip_with_index and reduce group
operators applied, and I do a join operation with it and the result of any
iterative data set, then I get a similar exception. An example file that
causes the issue is here:
https://gist.github.com/GEOFBOT/8490cc65155862f63306e322d38d276c

In this case, I get:
Traceback (most recent call last):
File "/tmp/flink-dist-cache-.../.../flink/plan.py", line 50, in <module>
env.execute(local=True)
File "/tmp/flink-dist-cache-.../.../flink/flink/plan/Environment.py", line
198, in execute
operator._go()
File "/tmp/flink-dist-cache-.../.../flink/flink/functions/Function.py",
line 64, in _go
self._run()
File "/tmp/flink-dist-cache-.../.../flink/flink/functions/JoinFunction.py",
line 28, in _run
for value in self._iterator:
File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
line 223, in next
return self._deserializer.deserialize(self._read)
File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
line 337, in deserialize
f2 = self._d2.deserialize(read)
File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
line 358, in deserialize
return tuple([s.deserialize(read) for s in self._deserializer])
File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
line 384, in deserialize
return unpack(">d", read(8))[0]
File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
line 188, in _read
print x, str.decode(x)
UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 1:
ordinal not in range(128)

Interestingly, if the "S" data set is changed to have only "0.0"s (such as
by commenting out L27-30), there is no error. However, this may just be a
way of tricking the deserializer into working.

If anyone has any information or assistance that could help me solve this
issue, I would really appreciate it. It would help me with my project and
also iron out any bugs that may be present in my Python API bulk iteration
implementation so that the feature is ready for production use.

Cheers,
Geoffrey

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message