flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Issue with parallelism when CoGrouping custom nested data tpye
Date Wed, 16 Sep 2015 12:27:48 GMT
Sorry, I was thinking too complicated. Forget about the methods I mentioned.

If you are implementing WritableComparable types, you need to override the
hashcode() method.
Flink treats WritableComparable types just like Hadoop [1].
DawnData does not implement hashcode() which causes inconsistent hash
partitioning.

Please let me know, if that solved your problem.

Cheers, Fabian

[1]
https://squarecog.wordpress.com/2011/02/20/hadoop-requires-stable-hashcode-implementations/

2015-09-16 14:12 GMT+02:00 Pieter Hameete <phameete@gmail.com>:

> Hi,
>
> I havent been able to find the problem yet, and I dont know exactly how to
> check the methods you suggested to check earlier (extractKeys,
> getFlatComparators, duplicate) for the Scala API. Do you have some pointers
> for me on how I can check these myself?
>
> In my earlier mail I stated that maps, filters and reduces work fine. I
> found that this was not correct: for my previous queries I have only used
> maps and filters. I made an extra test and found that indeed the following
> code using a reduce also generates faulty results when increasing
> paralellism past 1:
>
> def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput),
path)
> def test = auctions.groupBy(_.select("2.buyer.@person").getFirst).reduceGroup( (groupedauctions,
out : Collector[DawnData]) => {
>   out.collect(new DawnData(groupedauctions.size))
> }).setParallelism(1)
> test.print
>
> Does this indicate that something else could be wrong with the custom
> datatype?
>
> You can find the corresponding code and a small dataset at
> https://github.com/PHameete/dawn-flink in the *development* branch. It is
> a Scala Maven project so you should be able to run the
> *main.scala.wis.dawnflink.performance.DawnBenchmarkSuite* class out of
> the box to run the query from my first email. In this class you can also
> change the query thats being run or run multiple queries. If this does not
> work please let me know!
>
> Kind regards and cheers again!
>
> - Pieter
>
>
>
>
> 2015-09-16 11:24 GMT+02:00 Pieter Hameete <phameete@gmail.com>:
>
>> Cheers Till and Fabian for your fast replies, it's much appreciated!
>>
>> I figured something should be wrong with my data type. I have no doubt
>> the CoGroup works just fine :-) Its pointers what to investigate about my
>> datatype what I am looking for. Initially I had problems with serialization
>> causing strange issues as well, these were resolved after I had rewritten
>> my serialization so I believe that is working OK.
>>
>> I'll try looking into the data type some more with your tips. If I cant
>> figure it out i'll share the repository with you later today or tomorrow.
>>
>> Kind regards,
>>
>> Pieter
>>
>>
>>
>>
>>
>> 2015-09-16 11:02 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>
>>> This sound like a problem with your custom type and its (presumably)
>>> custom serializers and comparators.
>>>
>>> I assume it is not an issue of partitioning or sorting because Reduce is
>>> working fine, as you reported.
>>> CoGroup does also partition and sort data, but compares the elements of
>>> two sorted streams.
>>>
>>> I would check the following methods:
>>> - extractKeys
>>> - getFlatComparators
>>> - duplicate (duplicate must return a deep copy, esp. of all nested
>>> comparators)
>>>
>>> Feel free to share your custom TypeInfo, Comparator, and Serializers.
>>>
>>> Cheers, Fabian
>>>
>>> 2015-09-16 10:52 GMT+02:00 Till Rohrmann <till.rohrmann@gmail.com>:
>>>
>>>> Hi Pieter,
>>>>
>>>> your code doesn't look suspicious at the first glance. Would it be
>>>> possible for you to post a complete example with data (also possible to
>>>> include it in the code) to reproduce your problem?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phameete@gmail.com>
>>>> wrote:
>>>>
>>>>> Dear fellow Flinkers,
>>>>>
>>>>> I am implementing queries from the XMark (
>>>>> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a
>>>>> custom nested data type. Reading the XML data generated by the XMark
>>>>> generator into my custom nested datatype works perfectly, and the queries
>>>>> that I have implemented so far using mostly map, reduce and filter produce
>>>>> correct results.
>>>>>
>>>>> For the next query I wish to cogroup a dataset containing person data
>>>>> with a dataset containing auction data, joined by the *personid *of
>>>>> the person and the *personid *of the buyer of an auction, so that I
>>>>> can count the number of purchases of a person. I select this *personid
>>>>> *as key from the custom nested data type in the *where* and *equalTo
*functions
>>>>> of the *coGroup*. The XML2DawnInputFormat is my custom input format
>>>>> that reads XML into my custom nested datatype *DawnData*. The
>>>>> 'inputGraph' and 'auctionInput' are a projection on the XML input to
>>>>> prevent reading unnecessary data.
>>>>>
>>>>> def env = ExecutionEnvironment.*getExecutionEnvironment
>>>>> *def persons : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(inputGraph),
path)def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput),
path)def result = persons.coGroup(auctions).where(person => { person.select("2/@id/2")
}) .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( (personsres, auctionsres,
out : Collector[DawnData]) => {   // my cogroup function here that outputs the name of
the person and the number of auctions  }}).setParallelism(1)
>>>>>
>>>>> This code works fine with parallelism set to 1 as above. My issue is
>>>>> that if I raise the parallelism of the coGroup above 1 the data will
get
>>>>> mixed up. Often the auctions Iterator will be empty, and sometimes there
>>>>> are non-empty auction iterators passed to the cogroup function where
the
>>>>> persons iterator is empty, but this is impossible because all buyers
exist
>>>>> in the persons database!
>>>>>
>>>>> If anyone has some pointers for me why this code starts producing
>>>>> strange results when parallelism is set above 1 this would be greatly
>>>>> appreciated :-)
>>>>>
>>>>> Kind regards.
>>>>>
>>>>> Pieter Hameete
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message