Hi all,

I am trying to compare spark and flink batch performance. In my test i am using ratings.csv in http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I also concatenated ratings.csv 16 times to increase dataset size(total of 390465536 records almost 10gb).I am reading from google storage with gcs-connector and  file schema is : userId,movieId,rating,timestamp. Basically i am calculating average rating per movie

Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
case class Rating(userID: String, movieID: String, rating: Double, date: Timestamp)
 
def parseRating(line: String): Rating = {
  val arr = line.split(",")
  Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong * 1000)))
}
 
val ratings: DataSet[Rating] = env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a => parseRating(a))
ratings
  .map(i => (i.movieID, 1, i.rating))
  .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3), CombineHint.HASH)
  .map(i => (i._1, i._3 / i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)

with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s

Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
case class Rating(userID: String, movieID: String, rating: Double, date: Timestamp)
def parseRating(line: String): Rating = {
  val arr = line.split(",")
  Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong * 1000)))
}
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val keyed: RDD[(String, (Int, Double))] = sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r => (r.movieID, (1, r.rating)))
keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i => i._2 / i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)

with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1 minute(almost 3m6s) 

Machine config on google cloud:
taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
java version:jdk jdk-8u102
flink:1.1.3
spark:2.0.2

I also attached flink-conf.yaml. Although it is not such a big difference there is a 40% performance difference between spark and flink. Is there something i am doing wrong? If there is not how can i fine tune flink or is it normal spark has better performance with batch data?

Thank you in advance...