spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: Does using Custom Partitioner before calling reduceByKey improve performance?
Date Wed, 28 Oct 2015 02:20:10 GMT
If it is streaming, you can look at updateStateByKey for maintaining active
sessions. But wont work for batch.

and I answered that before. it can improve performance if you change the
partitioning scheme from hash-based to something else. Its hard to say
anything beyond that without understand the data skew and other details of
your application. Before jumping into that, you should simple change the
number of partitions and see if the performance improves.



On Tue, Oct 27, 2015 at 7:10 PM, swetha kasireddy <swethakasireddy@gmail.com
> wrote:

> After sorting the list of grouped events I would need to have an RDD that
> has a key which is nothing but the  sessionId and a list of values that are
> sorted by timeStamp for each input Json. So basically the return type would
> be RDD[(String, List[(Long, String)]  where the key is the sessionId and
>  a list of tuples that has a timeStamp and Json as the values. I will need
> to use groupByKey to do a groupBy sessionId and secondary sort by timeStamp
> and then get the list of JsonValues in a sorted order. Is there any
> alternative for that? Please find the code below that I used for the same.
>
>
> Also, does using a customPartitioner for a reduceByKey improve performance?
>
>
> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String,
> List[(Long, String)])] =
> { val grpdRecs = rdd.groupByKey(); val srtdRecs =
> grpdRecs.mapValues[(List[(Long, String)])](iter =>
> iter.toList.sortBy(_._1)) srtdRecs }
>
>
> On Tue, Oct 27, 2015 at 6:47 PM, Tathagata Das <tdas@databricks.com>
> wrote:
>
>> if you specify the same partitioner (custom or otherwise) for both
>> partitionBy and groupBy, then may be it will help. The fundamental problem
>> is groupByKey, that takes a lot of working memory.
>> 1. Try to avoid groupByKey. What is it that you want to after sorting the
>> list of grouped events? can you do that operation with a reduceByKey?
>> 2. If not, use more partitions. That would cause lesser data in each
>> partition, so less spilling.
>> 3. You can control the amount memory allocated for shuffles by changing
>> the configuration spark.shuffle.memoryFraction . More fraction would cause
>> less spilling.
>>
>>
>> On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy <
>> swethakasireddy@gmail.com> wrote:
>>
>>> So, Wouldn't  using a customPartitioner on the rdd upon which  the
>>> groupByKey  or reduceByKey is performed avoid shuffles and improve
>>> performance? My code does groupByAndSort and reduceByKey on different
>>> datasets as shown below. Would using a custom partitioner on those datasets
>>> before using a  groupByKey or reduceByKey improve performance? My idea is
>>> to avoid shuffles and improve performance. Also, right now I see a lot of
>>> spills when there is a very large dataset for groupByKey and reduceByKey. I
>>> think the memory is not sufficient. We need to group by sessionId and then
>>> sort the Jsons based on the timeStamp as shown in the below code.
>>>
>>>
>>> What is the alternative to using groupByKey for better performance? And
>>> in case of reduceByKey, would using a customPartitioner on the RDD upon
>>> which the reduceByKey is performed would reduce the shuffles and improve
>>> the performance?
>>>
>>>
>>> rdd.partitionBy(customPartitioner)
>>>
>>> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]):
>>> RDD[(String, List[(Long, String)])] =
>>> { val grpdRecs = rdd.groupByKey(); val srtdRecs =
>>> grpdRecs.mapValues[(List[(Long, String)])](iter =>
>>> iter.toList.sortBy(_._1)) srtdRecs }
>>>
>>> rdd.reduceByKey((a, b) => {
>>>   (Math.max(a._1, b._1), (a._2 ++ b._2))
>>> })
>>>
>>>
>>>
>>> On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <tdas@databricks.com>
>>> wrote:
>>>
>>>> If you just want to control the number of reducers, then setting the
>>>> numPartitions is sufficient. If you want to control how exact partitioning
>>>> scheme (that is some other scheme other than hash-based) then you need to
>>>> implement a custom partitioner. It can be used to improve data skews, etc.
>>>> which ultimately improves performance.
>>>>
>>>> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasireddy@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We currently use reduceByKey to reduce by a particular metric name in
>>>>> our
>>>>> Streaming/Batch job. It seems to be doing a lot of shuffles and it has
>>>>> impact on performance. Does using a custompartitioner before calling
>>>>> reduceByKey improve performance?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Swetha
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message