flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Exception from in-progress implementation of Python API bulk iterations
Date Thu, 13 Oct 2016 21:11:23 GMT
The chaining code is definitely related, I also have a pretty clear idea 
how to fix it.

The odd thing is that the Java API doesn't catch this type mismatch; the 
date types are
known when the plan is generated. This kind of error shouldn't even happen.

On 13.10.2016 21:15, Geoffrey Mon wrote:
> Thank you very much. Disabling chaining with the Python API allows my
> actual script to run properly. The division by zero must be an issue with
> the job that I posted on gist.
>
> Does that mean that the issue must be in the chaining part of the API?
> Chaining from the way I understand it is an important optimization that
> would be important for the performance comparison I wish to make in my
> project.
>
> Cheers,
> Geoffrey
>
> On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <chesnay@apache.org> wrote:
>
>> A temporary work around appears to be disabling chaining, which you can
>> do by commenting out L215 "self._find_chains()" in Environment.py.
>> Note that you then run into a division by zero error, but i can't tell
>> whether that is a problem of the job or not.
>>
>> On 13.10.2016 13:41, Chesnay Schepler wrote:
>>> Hey Geoffrey,
>>>
>>> I was able to reproduce the error and will look into it in more detail
>>> tomorrow.
>>>
>>> Regards,
>>> Chesnay
>>>
>>> On 12.10.2016 23:09, Geoffrey Mon wrote:
>>>> Hello,
>>>>
>>>> Has anyone had a chance to look into this? I am currently working on the
>>>> problem but I have minimal understanding of how the internal Flink
>>>> Python
>>>> API works; any expertise would be greatly appreciated.
>>>>
>>>> Thank you very much!
>>>>
>>>> Geoffrey
>>>>
>>>> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <geofbot@gmail.com> wrote:
>>>>
>>>>> Hi Chesnay,
>>>>>
>>>>> Heh, I have discovered that if I do not restart Flink after running my
>>>>> original problematic script, then similar issues will manifest
>>>>> themselves
>>>>> in other otherwise working scripts. I haven't been able to completely
>>>>> narrow down the problem, but I promise this new script will have a
>>>>> ClassCastException that is completely reproducible. :)
>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>
>>>>> Thanks,
>>>>> Geoffrey
>>>>>
>>>>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <chesnay@apache.org>
>>>>> wrote:
>>>>>
>>>>> Hello Geoffrey,
>>>>>
>>>>> this one works for me as well :D
>>>>>
>>>>> Regards,
>>>>> Chesnay
>>>>>
>>>>> On 28.09.2016 05:38, Geoffrey Mon wrote:
>>>>>> Hello Chesnay,
>>>>>>
>>>>>> Thank you for your help. After receiving your message I recompiled
my
>>>>>> version of Flink completely, and both the NullPointerException
>>>>>> listed in
>>>>>> the TODO and the ClassCastException with the join operation went
away.
>>>>>> Previously, I had been only recompiling the modules of Flink that
had
>>>>> been
>>>>>> changed to save time using "mvn clean install -pl :module" and
>>>>>> apparently
>>>>>> that may have been causing some of my issues.
>>>>>>
>>>>>> Now, the problem is more clear: when a specific group reduce
>>>>>> function in
>>>>> my
>>>>>> research project plan file is used within an iteration, I get a
>>>>>> ClassCastException exception:
>>>>>> Caused by: java.lang.ClassCastException:
>>>>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
>>>>>> at
>>>>>>
>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
>>>>>> at
>>>>>> org.apache.flink.runtime.iterative.io
>> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
>>>>>> at
>>>>>>
>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>>>>> at
>>>>>>
>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
>>>>>> at
>>>>>>
>> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
>>>>>> at
>>>>>>
>> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
>>>>>> at
>>>>>>
>> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>>>>>> at
>>>>>>
>> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
>>>>>> at
>>>>>>
>> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
>>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> I'm not sure why this is causing an exception, and I would greatly
>>>>>> appreciate any assistance. I've revised the barebones error-causing
>>>>>> plan
>>>>>> file to focus on this new error source:
>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>> The group reduce function in question seems to work just fine
>>>>>> outside of
>>>>>> iterations. I have organized the commits and pushed to a new branch
to
>>>>> make
>>>>>> it easier to test and hopefully review soon:
>>>>>> https://github.com/GEOFBOT/flink/tree/new-iterations
>>>>>>
>>>>>> Cheers,
>>>>>> Geoffrey
>>>>>>
>>>>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <chesnay@apache.org>
>>>>> wrote:
>>>>>>> Hello Geoffrey,
>>>>>>>
>>>>>>> i could not reproduce this issue with the commits and plan you
>>>>>>> provided.
>>>>>>>
>>>>>>> I tried out both the FLINK-4098 and bulk-iterations branches
(and
>>>>>>> reverted back to the specified commits) and built Flink from
scratch.
>>>>>>>
>>>>>>> Could you double check that the code you provided produces the
error?
>>>>>>> Also, which OS/python version are you using?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Chesnay
>>>>>>>
>>>>>>> On 20.09.2016 11:13, Chesnay Schepler wrote:
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I'll try to take a look this week.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Chesnay
>>>>>>>>
>>>>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote:
>>>>>>>>> Hello all,
>>>>>>>>>
>>>>>>>>> I have recently been working on adding bulk iterations
to the
>>>>>>>>> Python
>>>>>>>>> API of
>>>>>>>>> Flink in order to facilitate a research project I am
working on.
>>>>>>>>> The
>>>>>>>>> current changes can be seen in this GitHub diff:
>>>>>>>>>
>> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
>>>>>>>>> This implementation seems to work for, at least, simple
examples,
>>>>>>>>> such as
>>>>>>>>> incrementing numbers in a data set. However, with the
>>>>>>>>> transformations
>>>>>>>>> required for my project, I get an exception
>>>>>>>>> "java.lang.ClassCastException:
>>>>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple"
thrown
>>>>>>>>> from the
>>>>>>>>> deserializers called by
>>>>>>>>>
>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
>>>>>>>>> I've created the following simplified Python plan by
stripping
>>>>>>>>> down my
>>>>>>>>> research project code to the problem-causing parts:
>>>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>>>>
>>>>>>>>> I have been working on this issue but I don't have any
ideas on
>>>>>>>>> what
>>>>>>>>> might
>>>>>>>>> be the problem. Perhaps someone more knowledgeable about
the
>>>>>>>>> interior
>>>>> of
>>>>>>>>> the Python API could kindly help?
>>>>>>>>>
>>>>>>>>> Thank you very much.
>>>>>>>>>
>>>>>>>>> Geoffrey Mon
>>>>>>>>>
>>>
>>


Mime
View raw message