flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Geoffrey Mon <geof...@gmail.com>
Subject Re: Exception from in-progress implementation of Python API bulk iterations
Date Thu, 13 Oct 2016 19:15:54 GMT
Thank you very much. Disabling chaining with the Python API allows my
actual script to run properly. The division by zero must be an issue with
the job that I posted on gist.

Does that mean that the issue must be in the chaining part of the API?
Chaining from the way I understand it is an important optimization that
would be important for the performance comparison I wish to make in my
project.

Cheers,
Geoffrey

On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <chesnay@apache.org> wrote:

> A temporary work around appears to be disabling chaining, which you can
> do by commenting out L215 "self._find_chains()" in Environment.py.
> Note that you then run into a division by zero error, but i can't tell
> whether that is a problem of the job or not.
>
> On 13.10.2016 13:41, Chesnay Schepler wrote:
> > Hey Geoffrey,
> >
> > I was able to reproduce the error and will look into it in more detail
> > tomorrow.
> >
> > Regards,
> > Chesnay
> >
> > On 12.10.2016 23:09, Geoffrey Mon wrote:
> >> Hello,
> >>
> >> Has anyone had a chance to look into this? I am currently working on the
> >> problem but I have minimal understanding of how the internal Flink
> >> Python
> >> API works; any expertise would be greatly appreciated.
> >>
> >> Thank you very much!
> >>
> >> Geoffrey
> >>
> >> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <geofbot@gmail.com> wrote:
> >>
> >>> Hi Chesnay,
> >>>
> >>> Heh, I have discovered that if I do not restart Flink after running my
> >>> original problematic script, then similar issues will manifest
> >>> themselves
> >>> in other otherwise working scripts. I haven't been able to completely
> >>> narrow down the problem, but I promise this new script will have a
> >>> ClassCastException that is completely reproducible. :)
> >>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> >>>
> >>> Thanks,
> >>> Geoffrey
> >>>
> >>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <chesnay@apache.org>
> >>> wrote:
> >>>
> >>> Hello Geoffrey,
> >>>
> >>> this one works for me as well :D
> >>>
> >>> Regards,
> >>> Chesnay
> >>>
> >>> On 28.09.2016 05:38, Geoffrey Mon wrote:
> >>>> Hello Chesnay,
> >>>>
> >>>> Thank you for your help. After receiving your message I recompiled my
> >>>> version of Flink completely, and both the NullPointerException
> >>>> listed in
> >>>> the TODO and the ClassCastException with the join operation went away.
> >>>> Previously, I had been only recompiling the modules of Flink that had
> >>> been
> >>>> changed to save time using "mvn clean install -pl :module" and
> >>>> apparently
> >>>> that may have been causing some of my issues.
> >>>>
> >>>> Now, the problem is more clear: when a specific group reduce
> >>>> function in
> >>> my
> >>>> research project plan file is used within an iteration, I get a
> >>>> ClassCastException exception:
> >>>> Caused by: java.lang.ClassCastException:
> >>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
> >>>> at
> >>>>
> >>>
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
> >>>
> >>>> at
> >>>> org.apache.flink.runtime.iterative.io
> >>>
> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
> >>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> >>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
> >>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
> >>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
> >>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> >>>
> >>>> at
> >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> >>>> at
> >>>>
> >>>
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> >>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
> >>>
> >>>> at
> >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
> >>>> at java.lang.Thread.run(Thread.java:745)
> >>>>
> >>>> I'm not sure why this is causing an exception, and I would greatly
> >>>> appreciate any assistance. I've revised the barebones error-causing
> >>>> plan
> >>>> file to focus on this new error source:
> >>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> >>>> The group reduce function in question seems to work just fine
> >>>> outside of
> >>>> iterations. I have organized the commits and pushed to a new branch
to
> >>> make
> >>>> it easier to test and hopefully review soon:
> >>>> https://github.com/GEOFBOT/flink/tree/new-iterations
> >>>>
> >>>> Cheers,
> >>>> Geoffrey
> >>>>
> >>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <chesnay@apache.org>
> >>> wrote:
> >>>>> Hello Geoffrey,
> >>>>>
> >>>>> i could not reproduce this issue with the commits and plan you
> >>>>> provided.
> >>>>>
> >>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and
> >>>>> reverted back to the specified commits) and built Flink from scratch.
> >>>>>
> >>>>> Could you double check that the code you provided produces the error?
> >>>>> Also, which OS/python version are you using?
> >>>>>
> >>>>> Regards,
> >>>>> Chesnay
> >>>>>
> >>>>> On 20.09.2016 11:13, Chesnay Schepler wrote:
> >>>>>> Hello,
> >>>>>>
> >>>>>> I'll try to take a look this week.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Chesnay
> >>>>>>
> >>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote:
> >>>>>>> Hello all,
> >>>>>>>
> >>>>>>> I have recently been working on adding bulk iterations to
the
> >>>>>>> Python
> >>>>>>> API of
> >>>>>>> Flink in order to facilitate a research project I am working
on.
> >>>>>>> The
> >>>>>>> current changes can be seen in this GitHub diff:
> >>>>>>>
> >>>
> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
> >>>
> >>>>>>> This implementation seems to work for, at least, simple
examples,
> >>>>>>> such as
> >>>>>>> incrementing numbers in a data set. However, with the
> >>>>>>> transformations
> >>>>>>> required for my project, I get an exception
> >>>>>>> "java.lang.ClassCastException:
> >>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple"
thrown
> >>>>>>> from the
> >>>>>>> deserializers called by
> >>>>>>>
> >>>
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
> >>>
> >>>>>>> I've created the following simplified Python plan by stripping
> >>>>>>> down my
> >>>>>>> research project code to the problem-causing parts:
> >>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> >>>>>>>
> >>>>>>> I have been working on this issue but I don't have any ideas
on
> >>>>>>> what
> >>>>>>> might
> >>>>>>> be the problem. Perhaps someone more knowledgeable about
the
> >>>>>>> interior
> >>> of
> >>>>>>> the Python API could kindly help?
> >>>>>>>
> >>>>>>> Thank you very much.
> >>>>>>>
> >>>>>>> Geoffrey Mon
> >>>>>>>
> >>>
> >
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message