flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gábor Gévay <gga...@gmail.com>
Subject Re: spark vs flink batch performance
Date Fri, 18 Nov 2016 09:25:44 GMT
Hello,

Your program looks mostly fine, but there are a few minor things that
might help a bit:

Parallelism: In your attached flink-conf.yaml, you have 2 task slots
per task manager, and if you have 1 task manager, then your total
number of task slots is also 2. However, your default parallelism is
6. In Flink, the recommended default parallelism is exactly the total
number of task slots [1]. (This is in contrast to Spark, where the
recommended setting is 2-3 per CPU core [2].)

CSV reading: If your input is a CSV file, then you should use
readCsvFile (instead of readTextFile and then parsing it manually).

Collect call: How large is the DataSet that you are using collect on?
If it is large, then we might try to figure out a way to get the top
10 elements without first collecting the DataSet.

Best,
Gábor

[1] https://flink.apache.org/faq.html#what-is-the-parallelism-how-do-i-set-it
[2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism





2016-11-16 22:38 GMT+01:00 CPC <achalil@gmail.com>:
> Hi all,
>
> I am trying to compare spark and flink batch performance. In my test i am
> using ratings.csv in
> http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I also
> concatenated ratings.csv 16 times to increase dataset size(total of
> 390465536 records almost 10gb).I am reading from google storage with
> gcs-connector and  file schema is : userId,movieId,rating,timestamp.
> Basically i am calculating average rating per movie
>
> Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>
>
>>
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>
>
>>
>> val ratings: DataSet[Rating] =
>> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a =>
>> parseRating(a))
>> ratings
>>   .map(i => (i.movieID, 1, i.rating))
>>   .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3),
>> CombineHint.HASH)
>>   .map(i => (i._1, i._3 /
>> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)
>
>
> with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s
>
> Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>> val conf = new SparkConf().setAppName("Simple Application")
>> val sc = new SparkContext(conf)
>> val keyed: RDD[(String, (Int, Double))] =
>> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r
>> => (r.movieID, (1, r.rating)))
>> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i =>
>> i._2 /
>> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)
>
>
> with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1
> minute(almost 3m6s)
>
> Machine config on google cloud:
> taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
> jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
> java version:jdk jdk-8u102
> flink:1.1.3
> spark:2.0.2
>
> I also attached flink-conf.yaml. Although it is not such a big difference
> there is a 40% performance difference between spark and flink. Is there
> something i am doing wrong? If there is not how can i fine tune flink or is
> it normal spark has better performance with batch data?
>
> Thank you in advance...

Mime
View raw message