flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From CPC <acha...@gmail.com>
Subject Re: spark vs flink batch performance
Date Fri, 18 Nov 2016 07:26:50 GMT
Hi all,

In the mean time i have three workers. Any thoughts about improving flink
performance?

Thank you...

On Nov 17, 2016 00:38, "CPC" <achalil@gmail.com> wrote:

> Hi all,
>
> I am trying to compare spark and flink batch performance. In my test i am
> using ratings.csv in http://files.grouplens.org/
> datasets/movielens/ml-latest.zip dataset. I also concatenated ratings.csv
> 16 times to increase dataset size(total of 390465536 records almost 10gb).I
> am reading from google storage with gcs-connector and  file schema is :
> userId,movieId,rating,timestamp. Basically i am calculating average
> rating per movie
>
> Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
>
> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>>
>
>
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>
>
>
> val ratings: DataSet[Rating] = env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a
>> => parseRating(a))
>> ratings
>>   .map(i => (i.movieID, 1, i.rating))
>>   .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3),
>> CombineHint.HASH)
>>   .map(i => (i._1, i._3 / i._2)).collect().sortBy(_._1).
>> sortBy(_._2)(Ordering.Double.reverse).take(10)
>
>
> with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s
>
> Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>> val conf = new SparkConf().setAppName("Simple Application")
>> val sc = new SparkContext(conf)
>> val keyed: RDD[(String, (Int, Double))] = sc.textFile("gs://cpcflink/
>> wikistream/ratingsheadless16x.csv").map(parseRating).map(r =>
>> (r.movieID, (1, r.rating)))
>> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i =>
>> i._2 / i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.
>> Double.reverse).take(10).foreach(println)
>
>
> with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1
> minute(almost 3m6s)
>
> Machine config on google cloud:
> taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
> jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
> java version:jdk jdk-8u102
> flink:1.1.3
> spark:2.0.2
>
> I also attached flink-conf.yaml. Although it is not such a big difference
> there is a 40% performance difference between spark and flink. Is there
> something i am doing wrong? If there is not how can i fine tune flink or is
> it normal spark has better performance with batch data?
>
> Thank you in advance...
>

Mime
View raw message