flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Issue with parallelism when CoGrouping custom nested data tpye
Date Wed, 16 Sep 2015 12:27:48 GMT
Sorry, I was thinking too complicated. Forget about the methods I mentioned.

If you are implementing WritableComparable types, you need to override the
hashcode() method.
Flink treats WritableComparable types just like Hadoop [1].
DawnData does not implement hashcode() which causes inconsistent hash

Please let me know, if that solved your problem.

Cheers, Fabian


2015-09-16 14:12 GMT+02:00 Pieter Hameete <phameete@gmail.com>:

> Hi,
> I havent been able to find the problem yet, and I dont know exactly how to
> check the methods you suggested to check earlier (extractKeys,
> getFlatComparators, duplicate) for the Scala API. Do you have some pointers
> for me on how I can check these myself?
> In my earlier mail I stated that maps, filters and reduces work fine. I
> found that this was not correct: for my previous queries I have only used
> maps and filters. I made an extra test and found that indeed the following
> code using a reduce also generates faulty results when increasing
> paralellism past 1:
> def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput),
> def test = auctions.groupBy(_.select("2.buyer.@person").getFirst).reduceGroup( (groupedauctions,
out : Collector[DawnData]) => {
>   out.collect(new DawnData(groupedauctions.size))
> }).setParallelism(1)
> test.print
> Does this indicate that something else could be wrong with the custom
> datatype?
> You can find the corresponding code and a small dataset at
> https://github.com/PHameete/dawn-flink in the *development* branch. It is
> a Scala Maven project so you should be able to run the
> *main.scala.wis.dawnflink.performance.DawnBenchmarkSuite* class out of
> the box to run the query from my first email. In this class you can also
> change the query thats being run or run multiple queries. If this does not
> work please let me know!
> Kind regards and cheers again!
> - Pieter
> 2015-09-16 11:24 GMT+02:00 Pieter Hameete <phameete@gmail.com>:
>> Cheers Till and Fabian for your fast replies, it's much appreciated!
>> I figured something should be wrong with my data type. I have no doubt
>> the CoGroup works just fine :-) Its pointers what to investigate about my
>> datatype what I am looking for. Initially I had problems with serialization
>> causing strange issues as well, these were resolved after I had rewritten
>> my serialization so I believe that is working OK.
>> I'll try looking into the data type some more with your tips. If I cant
>> figure it out i'll share the repository with you later today or tomorrow.
>> Kind regards,
>> Pieter
>> 2015-09-16 11:02 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>> This sound like a problem with your custom type and its (presumably)
>>> custom serializers and comparators.
>>> I assume it is not an issue of partitioning or sorting because Reduce is
>>> working fine, as you reported.
>>> CoGroup does also partition and sort data, but compares the elements of
>>> two sorted streams.
>>> I would check the following methods:
>>> - extractKeys
>>> - getFlatComparators
>>> - duplicate (duplicate must return a deep copy, esp. of all nested
>>> comparators)
>>> Feel free to share your custom TypeInfo, Comparator, and Serializers.
>>> Cheers, Fabian
>>> 2015-09-16 10:52 GMT+02:00 Till Rohrmann <till.rohrmann@gmail.com>:
>>>> Hi Pieter,
>>>> your code doesn't look suspicious at the first glance. Would it be
>>>> possible for you to post a complete example with data (also possible to
>>>> include it in the code) to reproduce your problem?
>>>> Cheers,
>>>> Till
>>>> On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phameete@gmail.com>
>>>> wrote:
>>>>> Dear fellow Flinkers,
>>>>> I am implementing queries from the XMark (
>>>>> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a
>>>>> custom nested data type. Reading the XML data generated by the XMark
>>>>> generator into my custom nested datatype works perfectly, and the queries
>>>>> that I have implemented so far using mostly map, reduce and filter produce
>>>>> correct results.
>>>>> For the next query I wish to cogroup a dataset containing person data
>>>>> with a dataset containing auction data, joined by the *personid *of
>>>>> the person and the *personid *of the buyer of an auction, so that I
>>>>> can count the number of purchases of a person. I select this *personid
>>>>> *as key from the custom nested data type in the *where* and *equalTo
>>>>> of the *coGroup*. The XML2DawnInputFormat is my custom input format
>>>>> that reads XML into my custom nested datatype *DawnData*. The
>>>>> 'inputGraph' and 'auctionInput' are a projection on the XML input to
>>>>> prevent reading unnecessary data.
>>>>> def env = ExecutionEnvironment.*getExecutionEnvironment
>>>>> *def persons : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(inputGraph),
path)def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput),
path)def result = persons.coGroup(auctions).where(person => { person.select("2/@id/2")
}) .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( (personsres, auctionsres,
out : Collector[DawnData]) => {   // my cogroup function here that outputs the name of
the person and the number of auctions  }}).setParallelism(1)
>>>>> This code works fine with parallelism set to 1 as above. My issue is
>>>>> that if I raise the parallelism of the coGroup above 1 the data will
>>>>> mixed up. Often the auctions Iterator will be empty, and sometimes there
>>>>> are non-empty auction iterators passed to the cogroup function where
>>>>> persons iterator is empty, but this is impossible because all buyers
>>>>> in the persons database!
>>>>> If anyone has some pointers for me why this code starts producing
>>>>> strange results when parallelism is set above 1 this would be greatly
>>>>> appreciated :-)
>>>>> Kind regards.
>>>>> Pieter Hameete

View raw message