flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From simone <simone.povosca...@gmail.com>
Subject Re: Strange behavior on filter, group and reduce DataSets
Date Mon, 19 Mar 2018 15:35:14 GMT
Hi Fabian,

This simple code reproduces the behavior -> 
https://github.com/xseris/Flink-test-union

Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:
> Hmmm, I still don't see the problem.
> IMO, the result should be correct for both plans. The data is 
> replicated, filtered, reduced, and unioned.
> There is nothing in between the filter and reduce, that could cause 
> incorrect behavior.
>
> The good thing is, the optimizer seems to be fine. The bad thing is, 
> it is either the Flink runtime code or your functions.
> Given that one plan produces good results, it might be the Flink 
> runtime code.
>
> Coming back to my previous question.
> Can you provide a minimal program to reproduce the issue?
>
> Thanks, Fabian
>
> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhueske@gmail.com 
> <mailto:fhueske@gmail.com>>:
>
>     Ah, thanks for the update!
>     I'll have a look at that.
>
>     2018-03-19 15:13 GMT+01:00 Fabian Hueske <fhueske@gmail.com
>     <mailto:fhueske@gmail.com>>:
>
>         HI Simone,
>
>         Looking at the plan, I don't see why this should be happening.
>         The pseudo code looks fine as well.
>         Any chance that you can create a minimal program to reproduce
>         the problem?
>
>         Thanks,
>         Fabian
>
>         2018-03-19 12:04 GMT+01:00 simone <simone.povoscania@gmail.com
>         <mailto:simone.povoscania@gmail.com>>:
>
>             Hi Fabian,
>
>             reuse is not enabled. I attach the plan of the execution.
>
>             Thanks,
>             Simone
>
>
>             On 19/03/2018 11:36, Fabian Hueske wrote:
>>             Hi,
>>
>>             Union is actually a very simple operator (not even an
>>             operator in Flink terms). It just merges to inputs. There
>>             is no additional logic involved.
>>             Therefore, it should also not emit records before either
>>             of both ReduceFunctions sorted its data.
>>             Once the data has been sorted for the ReduceFunction, the
>>             data is reduced and emitted in a pipelined fashion, i.e.,
>>             once the first record is reduced, it is forwarded into
>>             the MapFunction (passing the unioned inputs).
>>             So it is not unexpected that Map starts processing before
>>             the ReduceFunction terminated.
>>
>>             Did you enable object reuse [1]?
>>             If yes, try to disable it. If you want to reuse objects,
>>             you have to be careful in how you implement your functions.
>>             If no, can you share the plan
>>             (ExecutionEnvironment.getExecutionPlan()) that was
>>             generated for the program?
>>
>>             Thanks,
>>             Fabian
>>
>>             [1]
>>             https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions
>>             <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions>
>>
>>
>>
>>             2018-03-19 9:51 GMT+01:00 Flavio Pompermaier
>>             <pompermaier@okkam.it <mailto:pompermaier@okkam.it>>:
>>
>>                 Any help on this? This thing is very strange..the
>>                 "manual" union of the output of the 2 datasets is
>>                 different than the flink-union of them..
>>                 Could it be a problem of the flink optimizer?
>>
>>                 Best,
>>                 Flavio
>>
>>                 On Fri, Mar 16, 2018 at 4:01 PM, simone
>>                 <simone.povoscania@gmail.com
>>                 <mailto:simone.povoscania@gmail.com>> wrote:
>>
>>                     Sorry, I translated the code into pseudocode too
>>                     fast. That is indeed an equals.
>>
>>
>>                     On 16/03/2018 15:58, Kien Truong wrote:
>>>
>>>                     Hi,
>>>
>>>                     Just a guest, but string compare in Java should
>>>                     be using equals method, not == operator.
>>>
>>>                     Regards,
>>>
>>>                     Kien
>>>
>>>
>>>                     On 3/16/2018 9:47 PM, simone wrote:
>>>>                     /subject.getField("field1") == "";//
>>>>                     /
>>
>>
>>
>>
>
>
>
>


Mime
View raw message