flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pieter Hameete <phame...@gmail.com>
Subject Re: Issue with parallelism when CoGrouping custom nested data tpye
Date Wed, 16 Sep 2015 12:12:50 GMT

I havent been able to find the problem yet, and I dont know exactly how to
check the methods you suggested to check earlier (extractKeys,
getFlatComparators, duplicate) for the Scala API. Do you have some pointers
for me on how I can check these myself?

In my earlier mail I stated that maps, filters and reduces work fine. I
found that this was not correct: for my previous queries I have only used
maps and filters. I made an extra test and found that indeed the following
code using a reduce also generates faulty results when increasing
paralellism past 1:

def auctions : DataSet[DawnData] = env.readFile(new
XML2DawnInputFormat(auctionInput), path)
def test = auctions.groupBy(_.select("2.buyer.@person").getFirst).reduceGroup(
(groupedauctions, out : Collector[DawnData]) => {
  out.collect(new DawnData(groupedauctions.size))

Does this indicate that something else could be wrong with the custom

You can find the corresponding code and a small dataset at
https://github.com/PHameete/dawn-flink in the *development* branch. It is a
Scala Maven project so you should be able to run the
*main.scala.wis.dawnflink.performance.DawnBenchmarkSuite* class out of the
box to run the query from my first email. In this class you can also change
the query thats being run or run multiple queries. If this does not work
please let me know!

Kind regards and cheers again!

- Pieter

2015-09-16 11:24 GMT+02:00 Pieter Hameete <phameete@gmail.com>:

> Cheers Till and Fabian for your fast replies, it's much appreciated!
> I figured something should be wrong with my data type. I have no doubt the
> CoGroup works just fine :-) Its pointers what to investigate about my
> datatype what I am looking for. Initially I had problems with serialization
> causing strange issues as well, these were resolved after I had rewritten
> my serialization so I believe that is working OK.
> I'll try looking into the data type some more with your tips. If I cant
> figure it out i'll share the repository with you later today or tomorrow.
> Kind regards,
> Pieter
> 2015-09-16 11:02 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>> This sound like a problem with your custom type and its (presumably)
>> custom serializers and comparators.
>> I assume it is not an issue of partitioning or sorting because Reduce is
>> working fine, as you reported.
>> CoGroup does also partition and sort data, but compares the elements of
>> two sorted streams.
>> I would check the following methods:
>> - extractKeys
>> - getFlatComparators
>> - duplicate (duplicate must return a deep copy, esp. of all nested
>> comparators)
>> Feel free to share your custom TypeInfo, Comparator, and Serializers.
>> Cheers, Fabian
>> 2015-09-16 10:52 GMT+02:00 Till Rohrmann <till.rohrmann@gmail.com>:
>>> Hi Pieter,
>>> your code doesn't look suspicious at the first glance. Would it be
>>> possible for you to post a complete example with data (also possible to
>>> include it in the code) to reproduce your problem?
>>> Cheers,
>>> Till
>>> On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phameete@gmail.com>
>>> wrote:
>>>> Dear fellow Flinkers,
>>>> I am implementing queries from the XMark (
>>>> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a
>>>> custom nested data type. Reading the XML data generated by the XMark
>>>> generator into my custom nested datatype works perfectly, and the queries
>>>> that I have implemented so far using mostly map, reduce and filter produce
>>>> correct results.
>>>> For the next query I wish to cogroup a dataset containing person data
>>>> with a dataset containing auction data, joined by the *personid *of
>>>> the person and the *personid *of the buyer of an auction, so that I
>>>> can count the number of purchases of a person. I select this *personid
>>>> *as key from the custom nested data type in the *where* and *equalTo *functions
>>>> of the *coGroup*. The XML2DawnInputFormat is my custom input format
>>>> that reads XML into my custom nested datatype *DawnData*. The
>>>> 'inputGraph' and 'auctionInput' are a projection on the XML input to
>>>> prevent reading unnecessary data.
>>>> def env = ExecutionEnvironment.*getExecutionEnvironment
>>>> *def persons : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(inputGraph),
path)def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput),
path)def result = persons.coGroup(auctions).where(person => { person.select("2/@id/2")
}) .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( (personsres, auctionsres,
out : Collector[DawnData]) => {   // my cogroup function here that outputs the name of
the person and the number of auctions  }}).setParallelism(1)
>>>> This code works fine with parallelism set to 1 as above. My issue is
>>>> that if I raise the parallelism of the coGroup above 1 the data will get
>>>> mixed up. Often the auctions Iterator will be empty, and sometimes there
>>>> are non-empty auction iterators passed to the cogroup function where the
>>>> persons iterator is empty, but this is impossible because all buyers exist
>>>> in the persons database!
>>>> If anyone has some pointers for me why this code starts producing
>>>> strange results when parallelism is set above 1 this would be greatly
>>>> appreciated :-)
>>>> Kind regards.
>>>> Pieter Hameete

View raw message