flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From CPC <acha...@gmail.com>
Subject spark vs flink batch performance
Date Wed, 16 Nov 2016 21:38:06 GMT
Hi all,

I am trying to compare spark and flink batch performance. In my test i am
using ratings.csv in
http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I
also concatenated ratings.csv 16 times to increase dataset size(total of
390465536 records almost 10gb).I am reading from google storage with
gcs-connector and  file schema is : userId,movieId,rating,timestamp.
Basically i am calculating average rating per movie

Code for flink(i tested CombineHint.HASH and CombineHint.SORT)

case class Rating(userID: String, movieID: String, rating: Double, date:
> Timestamp)
>


> def parseRating(line: String): Rating = {
>   val arr = line.split(",")
>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
> 1000)))
> }



val ratings: DataSet[Rating] =
> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a
> => parseRating(a))
> ratings
>   .map(i => (i.movieID, 1, i.rating))
>   .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3),
> CombineHint.HASH)
>   .map(i => (i._1, i._3 /
> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)


with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s

Code for Spark(i tested reduceByKey and reduceByKeyLocaly)

> case class Rating(userID: String, movieID: String, rating: Double, date:
> Timestamp)
> def parseRating(line: String): Rating = {
>   val arr = line.split(",")
>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
> 1000)))
> }
> val conf = new SparkConf().setAppName("Simple Application")
> val sc = new SparkContext(conf)
> val keyed: RDD[(String, (Int, Double))] =
> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r
> => (r.movieID, (1, r.rating)))
> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i =>
> i._2 /
> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)


with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1
minute(almost 3m6s)

Machine config on google cloud:
taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
java version:jdk jdk-8u102
flink:1.1.3
spark:2.0.2

I also attached flink-conf.yaml. Although it is not such a big difference
there is a 40% performance difference between spark and flink. Is there
something i am doing wrong? If there is not how can i fine tune flink or is
it normal spark has better performance with batch data?

Thank you in advance...

Mime
View raw message