flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: spark vs flink batch performance
Date Fri, 18 Nov 2016 10:43:09 GMT
I think this could be very helpful for your study:

http://db-blog.web.cern.ch/blog/luca-canali/2016-09-spark-20-performance-improvements-investigated-flame-graphs

Best,
Flavio

On Fri, Nov 18, 2016 at 11:37 AM, CPC <achalil@gmail.com> wrote:

> Hi Gabor,
>
> Thank you for your kind response. I forget to mention that i have actually
> three workers. This is why i set default paralelism to 6.
>
> For csv reading, i deliberately did not use csv reader since i want to run
> same code across spark and flink. Collect is returning 40k records which is
> not so big.
>
> I will try same test with spark 1.5 and 1.6 as well to understand whether
> spark 2.x series has some performance improvements because in those kind of
> tests, spark and flink was either on par or flink 10-15% faster than spark
> in the past. Aside from that are any configuration parameters you may
> propose to fine tune flink?
>
> Best,
> Anıl
>
> On Nov 18, 2016 12:25, "Gábor Gévay" <ggab90@gmail.com> wrote:
>
>> Hello,
>>
>> Your program looks mostly fine, but there are a few minor things that
>> might help a bit:
>>
>> Parallelism: In your attached flink-conf.yaml, you have 2 task slots
>> per task manager, and if you have 1 task manager, then your total
>> number of task slots is also 2. However, your default parallelism is
>> 6. In Flink, the recommended default parallelism is exactly the total
>> number of task slots [1]. (This is in contrast to Spark, where the
>> recommended setting is 2-3 per CPU core [2].)
>>
>> CSV reading: If your input is a CSV file, then you should use
>> readCsvFile (instead of readTextFile and then parsing it manually).
>>
>> Collect call: How large is the DataSet that you are using collect on?
>> If it is large, then we might try to figure out a way to get the top
>> 10 elements without first collecting the DataSet.
>>
>> Best,
>> Gábor
>>
>> [1] https://flink.apache.org/faq.html#what-is-the-parallelism-ho
>> w-do-i-set-it
>> [2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism
>>
>>
>>
>>
>>
>> 2016-11-16 22:38 GMT+01:00 CPC <achalil@gmail.com>:
>> > Hi all,
>> >
>> > I am trying to compare spark and flink batch performance. In my test i
>> am
>> > using ratings.csv in
>> > http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I
>> also
>> > concatenated ratings.csv 16 times to increase dataset size(total of
>> > 390465536 records almost 10gb).I am reading from google storage with
>> > gcs-connector and  file schema is : userId,movieId,rating,timestamp.
>> > Basically i am calculating average rating per movie
>> >
>> > Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
>> >>
>> >> case class Rating(userID: String, movieID: String, rating: Double,
>> date:
>> >> Timestamp)
>> >
>> >
>> >>
>> >> def parseRating(line: String): Rating = {
>> >>   val arr = line.split(",")
>> >>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong
>> *
>> >> 1000)))
>> >> }
>> >
>> >
>> >>
>> >> val ratings: DataSet[Rating] =
>> >> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a
>> =>
>> >> parseRating(a))
>> >> ratings
>> >>   .map(i => (i.movieID, 1, i.rating))
>> >>   .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3),
>> >> CombineHint.HASH)
>> >>   .map(i => (i._1, i._3 /
>> >> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.r
>> everse).take(10)
>> >
>> >
>> > with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s
>> >
>> > Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
>> >>
>> >> case class Rating(userID: String, movieID: String, rating: Double,
>> date:
>> >> Timestamp)
>> >> def parseRating(line: String): Rating = {
>> >>   val arr = line.split(",")
>> >>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong
>> *
>> >> 1000)))
>> >> }
>> >> val conf = new SparkConf().setAppName("Simple Application")
>> >> val sc = new SparkContext(conf)
>> >> val keyed: RDD[(String, (Int, Double))] =
>> >> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv
>> ").map(parseRating).map(r
>> >> => (r.movieID, (1, r.rating)))
>> >> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i =>
>> >> i._2 /
>> >> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.r
>> everse).take(10).foreach(println)
>> >
>> >
>> > with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1
>> > minute(almost 3m6s)
>> >
>> > Machine config on google cloud:
>> > taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
>> > jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
>> > java version:jdk jdk-8u102
>> > flink:1.1.3
>> > spark:2.0.2
>> >
>> > I also attached flink-conf.yaml. Although it is not such a big
>> difference
>> > there is a 40% performance difference between spark and flink. Is there
>> > something i am doing wrong? If there is not how can i fine tune flink
>> or is
>> > it normal spark has better performance with batch data?
>> >
>> > Thank you in advance...
>>
>

Mime
View raw message