flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Exception from in-progress implementation of Python API bulk iterations
Date Wed, 28 Sep 2016 13:16:17 GMT
Hello Geoffrey,

this one works for me as well :D

Regards,
Chesnay

On 28.09.2016 05:38, Geoffrey Mon wrote:
> Hello Chesnay,
>
> Thank you for your help. After receiving your message I recompiled my
> version of Flink completely, and both the NullPointerException listed in
> the TODO and the ClassCastException with the join operation went away.
> Previously, I had been only recompiling the modules of Flink that had been
> changed to save time using "mvn clean install -pl :module" and apparently
> that may have been causing some of my issues.
>
> Now, the problem is more clear: when a specific group reduce function in my
> research project plan file is used within an iteration, I get a
> ClassCastException exception:
> Caused by: java.lang.ClassCastException:
> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
> at
> org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
> at
> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
> at
> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
> at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> at
> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
> at java.lang.Thread.run(Thread.java:745)
>
> I'm not sure why this is causing an exception, and I would greatly
> appreciate any assistance. I've revised the barebones error-causing plan
> file to focus on this new error source:
> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> The group reduce function in question seems to work just fine outside of
> iterations. I have organized the commits and pushed to a new branch to make
> it easier to test and hopefully review soon:
> https://github.com/GEOFBOT/flink/tree/new-iterations
>
> Cheers,
> Geoffrey
>
> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <chesnay@apache.org> wrote:
>
>> Hello Geoffrey,
>>
>> i could not reproduce this issue with the commits and plan you provided.
>>
>> I tried out both the FLINK-4098 and bulk-iterations branches (and
>> reverted back to the specified commits) and built Flink from scratch.
>>
>> Could you double check that the code you provided produces the error?
>> Also, which OS/python version are you using?
>>
>> Regards,
>> Chesnay
>>
>> On 20.09.2016 11:13, Chesnay Schepler wrote:
>>> Hello,
>>>
>>> I'll try to take a look this week.
>>>
>>> Regards,
>>> Chesnay
>>>
>>> On 20.09.2016 02:38, Geoffrey Mon wrote:
>>>> Hello all,
>>>>
>>>> I have recently been working on adding bulk iterations to the Python
>>>> API of
>>>> Flink in order to facilitate a research project I am working on. The
>>>> current changes can be seen in this GitHub diff:
>>>>
>> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
>>>>
>>>> This implementation seems to work for, at least, simple examples,
>>>> such as
>>>> incrementing numbers in a data set. However, with the transformations
>>>> required for my project, I get an exception
>>>> "java.lang.ClassCastException:
>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
>>>> from the
>>>> deserializers called by
>>>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
>>>> I've created the following simplified Python plan by stripping down my
>>>> research project code to the problem-causing parts:
>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>
>>>> I have been working on this issue but I don't have any ideas on what
>>>> might
>>>> be the problem. Perhaps someone more knowledgeable about the interior of
>>>> the Python API could kindly help?
>>>>
>>>> Thank you very much.
>>>>
>>>> Geoffrey Mon
>>>>
>>>
>>


Mime
View raw message